Info

  先生成DataFrame,再把数据储存在HDFS上。

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577952043881)
SparkSession available as 'spark'import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession.builder().appName("learningScala").config("spark.executor.heartbeatInterval","60s").config("spark.network.timeout","120s").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.kryoserializer.buffer.max","512m").config("spark.dynamicAllocation.enabled", false).config("spark.sql.inMemoryColumnarStorage.compressed", true).config("spark.sql.inMemoryColumnarStorage.batchSize", 10000).config("spark.sql.broadcastTimeout", 600).config("spark.sql.autoBroadcastJoinThreshold", -1).config("spark.sql.crossJoin.enabled", true).master("local[*]")
val spark = builder.appName("OperateHdfs").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@679eb75a
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@551aa019
val df1 = Seq((1,   "male",     "18"    ,"2019-01-01 11:45:50"),(2,   "female", "37"  ,"2019-01-02 11:55:50"),(3,   "male",     "21"    ,"2019-01-21 11:45:50"),(4,   "female", "44"  ,"2019-02-01 12:45:50"),(5,   "male",     "39"    ,"2019-01-15 10:40:50")).toDF("id","sex","age", "createtime_str")
df1: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
val df=df1.withColumn("ds",date_format($"createtime_str","yyyyMMdd"))
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 3 more fields]
df.show()
+---+------+---+-------------------+--------+
| id|   sex|age|     createtime_str|      ds|
+---+------+---+-------------------+--------+
|  1|  male| 18|2019-01-01 11:45:50|20190101|
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  3|  male| 21|2019-01-21 11:45:50|20190121|
|  4|female| 44|2019-02-01 12:45:50|20190201|
|  5|  male| 39|2019-01-15 10:40:50|20190115|
+---+------+---+-------------------+--------+

查看hdfs文件

是否存在对应目录

import相关方法

import org.apache.hadoop.fs.{FileSystem, Path,FileStatus,FileUtil}
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}

获取配置信息

var path="../Data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
path: String = ../Data
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, __spark_hadoop_conf__.xml
val hdfs = FileSystem.get(hadoopConf)
hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@25866b28

设置路径

val outputPath = new Path("../Data")
outputPath: org.apache.hadoop.fs.Path = ../Data

hdfs上是否存在这个路径

hdfs.exists(outputPath)
res2: Boolean = true

hdfs上是否存在这个文件

hdfs.exists(new Path("../Data/test.txt"))
res3: Boolean = true

判断该path是否为文件夹?

hdfs.getFileStatus(outputPath).isDirectory()
res4: Boolean = true

判断该path是否为文件?

hdfs.getFileStatus(outputPath).isFile()
res5: Boolean = false
hdfs.getFileStatus(new Path("../Data/test.txt")).isFile()
res6: Boolean = true

获取路径下所有文件

val allFiles = FileUtil.stat2Paths(hdfs.listStatus(outputPath))
allFiles: Array[org.apache.hadoop.fs.Path] = Array(file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt)

打印一级目录名和文件名

allFiles.foreach(println)
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

打印一级目录名

逻辑:

  • 获取路径下所有文件
  • 循环遍历,判断每一个文件是否为Directory
  • 是则打印出来
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest

打印对应路径下文件

allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

封装成Object

object HdfsCheckPath {import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}import org.apache.spark.sql.{SparkSession}/**** @param spark SparkSession* @param path  字符串格式,指定的路径*/def isPathExist(spark: SparkSession, path: String) = {// 获取hdfs配置信息val hadoopConf = spark.sparkContext.hadoopConfigurationval hdfs = FileSystem.get(hadoopConf)// 设置输入的路径val outputPath = new Path(path)if (hdfs.exists(outputPath)) {println(s"This path(${path}) Already exist!")} else {println(s"This path(${path}) don't exist!")}hdfs.exists(outputPath)}/**** @param spark      SparkSession* @param path       对应路径* @param printlevel 打印的级别,枚举:directory、file、total*/def printPathDetail(spark: SparkSession, path: String, printlevel: String="total"): Unit = {val hadoopConf = spark.sparkContext.hadoopConfigurationval hdfs = FileSystem.get(hadoopConf)val isExists = hdfs.exists(new Path(path))// 路径不存在无需继续,上一步中直接print出信息if (isExists) {println("This path Already exist!")// 如果路径存在,打印出对应的一级目录和文件名val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))if (printlevel == "directory") {println("-----Directory:")allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "file") {println("-----File:")allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println)//打印对应一级目录名} else if (printlevel == "total") {println("-----Total:")allFiles.foreach(println)}}else{println("This path don't exist!")}}
}
defined object HdfsCheckPath
HdfsCheckPath.isPathExist(spark,"../Data")
This path(../Data) Already exist!res10: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data","total")
This path Already exist!
-----Total:
HdfsCheckPath.printPathDetail(spark,"../Data","directory")
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
HdfsCheckPath.printPathDetail(spark,"../Data","file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

储存文件

储存暂时只介绍parquet格式

非分区储存

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

mode参数
Specifies the behavior when data or table already exists. Options include:

  • overwrite: overwrite the existing data//覆盖写入
  • append: append the data//追加写入
  • ignore: ignore the operation (i.e. no-op)//忽略写入操作?那搞这个干啥?
  • error or errorifexists: default option, throw an exception at runtime//如果写入目录存在则报错

  一般选择errorifexists,防止错误地把数据插入在已经存在的目录。参数saveMode的值可以是String或者SaveMode(需要import spark.sql.SaveMode)

df1.write.mode(saveMode="errorifexists").parquet(path="../Data/hdfsSaveNoPartition/")

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3zNCZ30J-1577952595406)(…/Picture/Pic01.jpg)]

df1.write.mode(saveMode=SaveMode.Overwrite).parquet("../Data/hdfsSaveNoPartition/")

分区储存

val partitionColNames = Array("ds")
df.write.partitionBy(partitionColNames:_*).mode(saveMode="errorifexists").parquet("../Data/hdfsSavePartition/")
partitionColNames: Array[String] = Array(ds)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yu50KUcn-1577952595407)(…/Picture/Pic02.jpg)]

同样的,如果对应分区已经存在,也会报错。

读取

非分区读取

val df1New = spark.read.parquet("../Data/hdfsSaveNoPartition")
df1New: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df1New.show()
+---+------+---+-------------------+
| id|   sex|age|     createtime_str|
+---+------+---+-------------------+
|  2|female| 37|2019-01-02 11:55:50|
|  4|female| 44|2019-02-01 12:45:50|
|  1|  male| 18|2019-01-01 11:45:50|
|  3|  male| 21|2019-01-21 11:45:50|
|  5|  male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+

分区读取

指定某一个固定的分区

spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101").show()
+---+----+---+-------------------+--------+
| id| sex|age|     createtime_str|      ds|
+---+----+---+-------------------+--------+
|  1|male| 18|2019-01-01 11:45:50|20190101|
+---+----+---+-------------------+--------+

指定多个分区

spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101","../Data/hdfsSavePartition/ds=20190102").show()
+---+------+---+-------------------+--------+
| id|   sex|age|     createtime_str|      ds|
+---+------+---+-------------------+--------+
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  1|  male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+

指定多分区的简略写法

:*作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:*)就是将1 to 5当作参数序列处理

val partitionPathArray = Array(20190101,20190102).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102)
partitionPathArray.mkString("\n")
res20: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray:_*).show()
+---+------+---+-------------------+--------+
| id|   sex|age|     createtime_str|      ds|
+---+------+---+-------------------+--------+
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  1|  male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+

如果指定分区中部分缺失

val partitionPathArray1 = Array(20190101,20190102,20191231).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray1: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102, ../Data/hdfsSavePartition/ds=20191231)
partitionPathArray1.mkString("\n")
res22: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
../Data/hdfsSavePartition/ds=20191231
partitionPathArray1.map(x=>HdfsCheckPath.isPathExist(spark,x))
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!res23: Array[Boolean] = Array(true, true, false)
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray1.filter(x=>HdfsCheckPath.isPathExist(spark,x)):_*).show()
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!
+---+------+---+-------------------+--------+
| id|   sex|age|     createtime_str|      ds|
+---+------+---+-------------------+--------+
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  1|  male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+

删除文件

删除与文件本身无关,是对整个目录的删除操作

删除非分区数据

第二个参数表示递归删除,即删除当前路径以及其子文件和子文件夹

路径不存在时,返回false

hdfs.delete(new Path("../Data/hdfsSaveNoPartition555/"),true)
res25: Boolean = false

路径存在返回true

hdfs.delete(new Path("../Data/hdfsSaveNoPartition/"),true)
res26: Boolean = true

删除分区数据

  删除分区的逻辑比较简单,即把需删除分区的完整路径循环删除即可

查看分区数据

HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201

删除指定某个分区

hdfs.delete(new Path("../Data/hdfsSavePartition/ds=20190101"),true)
res28: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201

删除多个分区

val partitionPathArray2 = Array("20190102","20200101").map(x=>"../Data/hdfsSavePartition"+"/"+x)
partitionPathArray2: Array[String] = Array(../Data/hdfsSavePartition/20190102, ../Data/hdfsSavePartition/20200101)
partitionPathArray2.mkString("\n")
res30: String =
../Data/hdfsSavePartition/20190102
../Data/hdfsSavePartition/20200101
partitionPathArray.foreach(x=>hdfs.delete(new Path(x),true))

查看当前目录,上述两个分区已经被删除

HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition/","total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

如上所示,如果有的分区不存在,程序并不报错,可以在代码里加上print语句,作为提示。

hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res33: Boolean = true

封装

object封装

目的:

  • 封装常用的hdfs操作,方便调用
  • object方式组织,方便函数的内部调用
  • obejct scala2Hdfs
object scala2Hdfs {import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession,Dataset}/*** 获取文件系统对象** @param spark* @return*/def getFileSystem(spark: SparkSession): FileSystem = {val hadoopConf = spark.sparkContext.hadoopConfigurationFileSystem.get(hadoopConf)}/**** @param spark SparkSession* @param path  字符串格式,指定的路径*/def isPathExist(spark: SparkSession, path: String) = {// 获取hdfs配置信息val hdfs = getFileSystem(spark)// 设置输入的路径val outputPath = new Path(path)hdfs.exists(outputPath)//    if (hdfs.exists(outputPath)) {//      println("This path Already exist!")//    } else {//      println("This path don't exist!")//    }}/*** 判断文件夹是否存在,以及是否为文件夹** @param spark SparkSession* @param path  字符串格式,指定的路径* @return*/def isExistsDirectory(spark: SparkSession, path: String): Boolean = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)if(!(hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory)){println(s"This path(${path}) don't exist!")}hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory}/**** @param spark SparkSession* @param path  字符串格式,指定的路径* @return*/def isExistsFile(spark: SparkSession, path: String): Boolean = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isFile}/**** @param spark      SparkSession* @param path       对应路径* @param printlevel 打印的级别,枚举:directory、file、total*/def printPathDetail(spark: SparkSession, path: String, printlevel: String = "total"): Unit = {val hdfs =  getFileSystem(spark)val isExists = isPathExist(spark, path)// 路径不存在无需继续,上一步中直接print出信息if (isExists) {println("This path Already exist!")// 如果路径存在,打印出对应的一级目录和文件名val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))if (printlevel == "directory") {println("-----Directory:")allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "file") {println("-----File:")allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "total") {println("-----Total:")allFiles.foreach(println)}} else {println("This path don't exist!")}}/**** @param spark SparkSession* @param path  对应路径*              path存在,检查是否有子文件夹,有break,没有delete*/def deleteSinglePath(spark: SparkSession, path: String): Unit = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)// 先判断是否有对应路径,没有直接写,有删除if (hdfs.exists(outputPath)) {// 判断对应路径下是否有子文件夹val isDirectoryExists = hdfs.listStatus(outputPath).exists(_.isDirectory)// 没有子文件夹,删除数据if (!isDirectoryExists) {// 递归删除hdfs.delete(outputPath, true)println("Clean this path: " + path)} else {// 如果有子文件,防止错删,跳过println("Contains sub path, Skip clean")}} else {println(s"This path(${path}) don't exist!")}}/**** @param spark           SparkSession* @param path            对应路径,要求最后不用带/* @param partitionsArray 对应分区列表,格式和表中实际分区字段一致* @param isPartition     是否分区数据*/def deletePartitionPath(spark: SparkSession, path: String, partitionsArray: Array[String], isPartition: Boolean = true): Unit = {if (!isPartition) {// 如果非分区,直接删除deleteSinglePath(spark, path)} else {if (partitionsArray.length == 0) {println("partitionsArray has no items")} else {// 拼接分区pathval partitionPathArray = partitionsArray.map(x => path + "/" + s"ds=${x}")// 循环删除分区,如果路径不存在,报错partitionPathArray.foreach(x => deleteSinglePath(spark, x))}}}/*** 存储** @param df          存储的df* @param path        路径* @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚* @param saveType    存储类型* @param saveMode    模式*/def saveSinglePath(df: DataFrame, path: String, coalesceNum: Int = 0, saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = {var tempDf = dfif (coalesceNum >= 1) {tempDf = df.coalesce(coalesceNum)}val write = tempDf.write.mode(saveMode)saveType match {case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)case "parquet" => write.parquet(path);println("Save this path: "+ path)case _ => println(s"Not Support this savetype:${saveType}")}}/**** @param dataFrameWithDs   待分区字段的dataframe* @param path              储存的path* @param saveMode          保存方式,默认为append* @param coalesceNum       合并后分区数,为了提高关联的效率,具体用法暂不清楚* @param partitionColNames 分区列名,array形式*                          得到分区列数据,存储即可*/def savePartitionPath(dataFrameWithDs: DataFrame, path: String, saveType: String = "parquet", saveMode: String = "append", coalesceNum: Int = 0, partitionColNames: Array[String] = Array("ds")): Unit = {var tempDf = dataFrameWithDsif (coalesceNum >= 1) {tempDf = dataFrameWithDs.coalesce(coalesceNum)}val write = tempDf.write.partitionBy(partitionColNames: _*).mode(saveMode)saveType match {case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)case _ => write.parquet(path);println("Save this path: "+ path)}}/*** 清空并保存** @param df          存储的df* @param path        路径* @param coalesceNum 合并后分区数* @param saveType    存储类型* @param saveMode    模式*/def cleanAndSaveSinglePath(spark: SparkSession, df: DataFrame, path: String, coalesceNum: Int = 0, saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = {// 先删除对应路径deleteSinglePath(spark, path)// 再保存数据saveSinglePath(df, path, coalesceNum, saveType, saveMode)}/**** @param dataFrameWithDs   存储的df* @param path              路径* @param partitionsArray   分区list* @param coalesceNum       合并后分区数* @param partitionColNames 分区列*/def cleanAndSavePartitionPath(dataFrameWithDs: DataFrame, path: String, saveMode: String = "append", partitionsArray: Array[String], coalesceNum: Int = 0, partitionColNames: Array[String] = Array("ds")): Unit = {val spark = dataFrameWithDs.sparkSession// 先删除对应分区的数据deletePartitionPath(spark, path, partitionsArray)// 保存对应分区的数据savePartitionPath(dataFrameWithDs = dataFrameWithDs, path = path,saveMode=saveMode, partitionColNames = partitionColNames)}def readSinglePath(spark: SparkSession, path: String): DataFrame = {if (isExistsDirectory (spark, path) ) {spark.read.parquet(path)}else{println ("This path don't exist!")// 返回个空数据框,不知道有没有别的方式!!spark.emptyDataFrame}
}/**** @param spark SparkSession* @param path 路径* @param readType 文件类型,暂只支持parquet* @param partitionsArray 所读的分区* @return*/def readPartitionPath(spark: SparkSession, path: String, readType: String = "parquet", partitionsArray: Array[String])={if(readType!="parquet"){println(s"Not Support this readType:${readType}")spark.emptyDataFrame}else{if(partitionsArray.length==0){println("PartitionsArray is null")spark.emptyDataFrame}else{val partitionPathArray = partitionsArray.map(x=>path +"/"+ s"ds=${x}")//过滤掉不存在的分区目录spark.read.option("basePath", path).parquet(partitionPathArray.filter(x=> isExistsDirectory(spark, x)):_*)}}
}
}
defined object scala2Hdfs

查看操作

查看路径是否存在,是否为文件、为目录

var path ="../Data"
path: String = ../Data
scala2Hdfs.isExistsDirectory(spark,path)
res34: Boolean = true
scala2Hdfs.isPathExist(spark,path)
res35: Boolean = true
scala2Hdfs.isExistsFile(spark,path)
res36: Boolean = false
scala2Hdfs.isExistsFile(spark,"../Data/test.txt")
res37: Boolean = true
scala2Hdfs.isPathExist(spark,"../Data/test.txt")
res38: Boolean = true

打印路径信息

scala2Hdfs.printPathDetail(spark,path,"total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
scala2Hdfs.printPathDetail(spark,path,"directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
scala2Hdfs.printPathDetail(spark,path,"file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

保存操作

非分区保存

saveMode="errorifexists"相当于直接新建,再保存。如果路径存在,则会报错。
此时可以采用覆盖写入的方式或者用saveSinglePath方法,先删除再写入

scala2Hdfs.saveSinglePath(df=df1,path="../Data/hdfsSaveNoPartition",saveMode="errorifexists")
Save this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.cleanAndSaveSinglePath(spark=spark,df=df1,path="../Data/hdfsSaveNoPartition",saveMode="overwrite")
Clean this path: ../Data/hdfsSaveNoPartition
Save this path: ../Data/hdfsSaveNoPartition

错误的储存格式

scala2Hdfs.saveSinglePath(df=df,path="../Data/hdfsSaveTest",saveMode="append",saveType="dd")
Not Support this savetype:dd

分区储存

scala2Hdfs.savePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionColNames=Array("ds"))
Save this path: ../Data/hdfsSavePartition

查看分区保存的结果

scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

cleanAndSavePartitionPath,可以先对分区删除,再保存。此时,非20190101分区会有重复数据
常规的应用场景,都是往新分区里查数据,可具体问题具体分析

scala2Hdfs.cleanAndSavePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101"),partitionColNames=Array("ds"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Save this path: ../Data/hdfsSavePartition
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition/ds=20190101")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101/part-00000-5771f2d8-7713-4fc3-9b5d-06df065bd3b8.c000.snappy.parquet

读取操作

非分区数据读取

spark.read.parquet("../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id|   sex|age|     createtime_str|
+---+------+---+-------------------+
|  2|female| 37|2019-01-02 11:55:50|
|  4|female| 44|2019-02-01 12:45:50|
|  1|  male| 18|2019-01-01 11:45:50|
|  3|  male| 21|2019-01-21 11:45:50|
|  5|  male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
scala2Hdfs.readSinglePath(spark, "../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id|   sex|age|     createtime_str|
+---+------+---+-------------------+
|  2|female| 37|2019-01-02 11:55:50|
|  4|female| 44|2019-02-01 12:45:50|
|  1|  male| 18|2019-01-01 11:45:50|
|  3|  male| 21|2019-01-21 11:45:50|
|  5|  male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+

读取分区数据

scala2Hdfs.readPartitionPath(spark=spark, path="../Data/hdfsSavePartition",partitionsArray=Array("20191101","20190101","20190102")).show()
This path(../Data/hdfsSavePartition/ds=20191101) don't exist!
+---+------+---+-------------------+--------+
| id|   sex|age|     createtime_str|      ds|
+---+------+---+-------------------+--------+
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  2|female| 37|2019-01-02 11:55:50|20190102|
|  1|  male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+

删除操作

删除操作错误路径

scala2Hdfs.deleteSinglePath(spark,"../data1")
This path(../data1) don't exist!

删除非分区数据

scala2Hdfs.deleteSinglePath(spark,"../Data/hdfsSaveNoPartition")
Clean this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSaveNoPartition")
This path don't exist!

删除分区数据

scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
scala2Hdfs.deletePartitionPath(spark=spark,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101","20190102","20200101"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Clean this path: ../Data/hdfsSavePartition/ds=20190102
This path(../Data/hdfsSavePartition/ds=20200101) don't exist!
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

清空所有数据

hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res58: Boolean = true
scala2Hdfs.printPathDetail(spark,"../Data")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

                        2020-01-02 于南京江宁区九龙湖

Scala102-操作Hdfs相关推荐

  1. hadoop java操作hdfs

    hfds 是一种文件系统,用于存储hadoop将要处理的数据.适用于大规模分布式数据处理,是一个可扩展行的文件分布式系统: 优点 1.如果出现节点宕机,hdfs,可以持续监视,错误检查,容错处理,文档 ...

  2. python 新建文件 hdfs_大数据学习(六):Python操作hdfs(包括追加数据文件到hdfs文件)...

    #!coding:utf-8 import sys from hdfs.client import Client #设置utf-8模式 reload(sys) sys.setdefaultencodi ...

  3. python操作hdfs_python 操作hdfs

    from hdfs.client importClient#关于python操作hdfs的API可以查看官网:#https://hdfscli.readthedocs.io/en/latest/api ...

  4. hdfs user 连接_Python入门操作HDFS

    点击上方蓝色字体,关注我们 读完需 7 分钟 速读需 3 分钟 在了解了Hadoop的基本使用后,需要通过编程语言进一步操作.对于没有Java基础的运维人,Python如何操作hdfs也就成了我们优先 ...

  5. Windows下使用Java API操作HDFS的常用方法

    场景 Windows下配置Hadoop的Java开发环境以及用Java API操作HDFS: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/det ...

  6. Windows下配置Hadoop的Java开发环境以及用Java API操作HDFS

    场景 HDFS的访问方式之HDFS shell的常用命令: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119351218 在上 ...

  7. Java操作HDFS文件系统

    对于操作HDFS文件系统,需要有一个入口,对于Hadoop来说,编程入口就是FileSystem. 例如我们要使用API创建一个文件夹: /*** @author vincent* @time 201 ...

  8. 通过java api操作hdfs(kerberos认证)

    参考代码如下 import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs ...

  9. Java 操作 HDFS

    HDFS 作为开源界比较成熟的分布式文件存储系统,适用于海量文件存储,本文介绍了如何使用 Java 操作 HDFS,采用 Maven 管理包. pom.xml <dependency>&l ...

  10. java向hdfs提交命令_Java语言操作HDFS常用命令测试代码

    本文主要向大家介绍了Java语言操作HDFS常用命令测试代码,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助. package com.yxc.hdfs; import org.apac ...

最新文章

  1. msclass 文字滚动_文字无缝循环滚动(标题向上滚动)
  2. 【ASP.NET】 【防止连续多次点击提交按钮 导致页面重复提交】
  3. 从统计局抓取2016年最新的全国区县数据!!
  4. json对象数组按对象属性排序
  5. 22届腾讯暑期实习三轮面试面经(已oc)
  6. Linux多线程——使用信号量同步线程
  7. mysql proxy 读写分离_mysql-proxy 实现读写分离
  8. vue中v-model的使用
  9. Tomcat报错:ERROR:transport error 202: gethostbyname: unknown host
  10. mybatis逆向工程利用mybatis-generator-core自动生成代码
  11. 进阶系列(11)—— C#多线程
  12. python 根据条件输出_python数据类型、输入输出、运算符、条件判断、循环
  13. 利用Travis IC实现Hexo博客自动化部署
  14. 使用stream报错:stream has already been operated upon or closed
  15. 数据分析python面试题_10道Python常见面试题
  16. nginx的下载与安装
  17. java与javax有什么区别?
  18. 单片机奇偶交替闪烁_单片机控制继电器 使LED灯交替闪烁
  19. svg 内部元素scale 缩放不在原位置解决办法
  20. linux aarch JBR with JCEF

热门文章

  1. Mysql8 CentOS7 Compressed TAR Archive安装
  2. 李丰杰老师 实战生产管理专家
  3. 手机一键还原数据还能恢复吗
  4. 在google应用商店下载的Vue.js Devtools在控制台(开发者模式)没有vue选项
  5. 微信小程序小说阅读器+后台管理系统|前后分离VUE
  6. s8 android调用相机,改善画质无需硬件:当Galaxy S8遇上谷歌Pixel相机
  7. 多视角3D重建技术市场现状研究分析报告-
  8. php面试题之一,PHP的面试题集
  9. getUserMedia录制视频有噪音解决方法
  10. 如何将MacBook Pro投屏到电视上(相同局域网、免安装)(1分钟)