Scala102-操作Hdfs
Info
先生成DataFrame,再把数据储存在HDFS上。
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577952043881)
SparkSession available as 'spark'import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession.builder().appName("learningScala").config("spark.executor.heartbeatInterval","60s").config("spark.network.timeout","120s").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.kryoserializer.buffer.max","512m").config("spark.dynamicAllocation.enabled", false).config("spark.sql.inMemoryColumnarStorage.compressed", true).config("spark.sql.inMemoryColumnarStorage.batchSize", 10000).config("spark.sql.broadcastTimeout", 600).config("spark.sql.autoBroadcastJoinThreshold", -1).config("spark.sql.crossJoin.enabled", true).master("local[*]")
val spark = builder.appName("OperateHdfs").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@679eb75a
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@551aa019
val df1 = Seq((1, "male", "18" ,"2019-01-01 11:45:50"),(2, "female", "37" ,"2019-01-02 11:55:50"),(3, "male", "21" ,"2019-01-21 11:45:50"),(4, "female", "44" ,"2019-02-01 12:45:50"),(5, "male", "39" ,"2019-01-15 10:40:50")).toDF("id","sex","age", "createtime_str")
df1: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
val df=df1.withColumn("ds",date_format($"createtime_str","yyyyMMdd"))
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 3 more fields]
df.show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 1| male| 18|2019-01-01 11:45:50|20190101|
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 3| male| 21|2019-01-21 11:45:50|20190121|
| 4|female| 44|2019-02-01 12:45:50|20190201|
| 5| male| 39|2019-01-15 10:40:50|20190115|
+---+------+---+-------------------+--------+
查看hdfs文件
是否存在对应目录
import相关方法
import org.apache.hadoop.fs.{FileSystem, Path,FileStatus,FileUtil}
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}
获取配置信息
var path="../Data"
val hadoopConf = spark.sparkContext.hadoopConfiguration
path: String = ../Data
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, __spark_hadoop_conf__.xml
val hdfs = FileSystem.get(hadoopConf)
hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@25866b28
设置路径
val outputPath = new Path("../Data")
outputPath: org.apache.hadoop.fs.Path = ../Data
hdfs上是否存在这个路径
hdfs.exists(outputPath)
res2: Boolean = true
hdfs上是否存在这个文件
hdfs.exists(new Path("../Data/test.txt"))
res3: Boolean = true
判断该path是否为文件夹?
hdfs.getFileStatus(outputPath).isDirectory()
res4: Boolean = true
判断该path是否为文件?
hdfs.getFileStatus(outputPath).isFile()
res5: Boolean = false
hdfs.getFileStatus(new Path("../Data/test.txt")).isFile()
res6: Boolean = true
获取路径下所有文件
val allFiles = FileUtil.stat2Paths(hdfs.listStatus(outputPath))
allFiles: Array[org.apache.hadoop.fs.Path] = Array(file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt)
打印一级目录名和文件名
allFiles.foreach(println)
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
打印一级目录名
逻辑:
- 获取路径下所有文件
- 循环遍历,判断每一个文件是否为Directory
- 是则打印出来
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
打印对应路径下文件
allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录
.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
封装成Object
object HdfsCheckPath {import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}import org.apache.spark.sql.{SparkSession}/**** @param spark SparkSession* @param path 字符串格式,指定的路径*/def isPathExist(spark: SparkSession, path: String) = {// 获取hdfs配置信息val hadoopConf = spark.sparkContext.hadoopConfigurationval hdfs = FileSystem.get(hadoopConf)// 设置输入的路径val outputPath = new Path(path)if (hdfs.exists(outputPath)) {println(s"This path(${path}) Already exist!")} else {println(s"This path(${path}) don't exist!")}hdfs.exists(outputPath)}/**** @param spark SparkSession* @param path 对应路径* @param printlevel 打印的级别,枚举:directory、file、total*/def printPathDetail(spark: SparkSession, path: String, printlevel: String="total"): Unit = {val hadoopConf = spark.sparkContext.hadoopConfigurationval hdfs = FileSystem.get(hadoopConf)val isExists = hdfs.exists(new Path(path))// 路径不存在无需继续,上一步中直接print出信息if (isExists) {println("This path Already exist!")// 如果路径存在,打印出对应的一级目录和文件名val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))if (printlevel == "directory") {println("-----Directory:")allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "file") {println("-----File:")allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println)//打印对应一级目录名} else if (printlevel == "total") {println("-----Total:")allFiles.foreach(println)}}else{println("This path don't exist!")}}
}
defined object HdfsCheckPath
HdfsCheckPath.isPathExist(spark,"../Data")
This path(../Data) Already exist!res10: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data","total")
This path Already exist!
-----Total:
HdfsCheckPath.printPathDetail(spark,"../Data","directory")
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
HdfsCheckPath.printPathDetail(spark,"../Data","file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
储存文件
储存暂时只介绍parquet格式
非分区储存
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
mode参数
Specifies the behavior when data or table already exists. Options include:
- overwrite: overwrite the existing data//覆盖写入
- append: append the data//追加写入
- ignore: ignore the operation (i.e. no-op)//忽略写入操作?那搞这个干啥?
- error or errorifexists: default option, throw an exception at runtime//如果写入目录存在则报错
一般选择errorifexists,防止错误地把数据插入在已经存在的目录。参数saveMode的值可以是String
或者SaveMode
(需要import spark.sql.SaveMode)
df1.write.mode(saveMode="errorifexists").parquet(path="../Data/hdfsSaveNoPartition/")
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3zNCZ30J-1577952595406)(…/Picture/Pic01.jpg)]
df1.write.mode(saveMode=SaveMode.Overwrite).parquet("../Data/hdfsSaveNoPartition/")
分区储存
val partitionColNames = Array("ds")
df.write.partitionBy(partitionColNames:_*).mode(saveMode="errorifexists").parquet("../Data/hdfsSavePartition/")
partitionColNames: Array[String] = Array(ds)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yu50KUcn-1577952595407)(…/Picture/Pic02.jpg)]
同样的,如果对应分区已经存在,也会报错。
读取
非分区读取
val df1New = spark.read.parquet("../Data/hdfsSaveNoPartition")
df1New: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df1New.show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
分区读取
指定某一个固定的分区
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101").show()
+---+----+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+----+---+-------------------+--------+
| 1|male| 18|2019-01-01 11:45:50|20190101|
+---+----+---+-------------------+--------+
指定多个分区
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet("../Data/hdfsSavePartition/ds=20190101","../Data/hdfsSavePartition/ds=20190102").show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
指定多分区的简略写法
:*作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:*)就是将1 to 5当作参数序列处理
val partitionPathArray = Array(20190101,20190102).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102)
partitionPathArray.mkString("\n")
res20: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray:_*).show()
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
如果指定分区中部分缺失
val partitionPathArray1 = Array(20190101,20190102,20191231).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray1: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102, ../Data/hdfsSavePartition/ds=20191231)
partitionPathArray1.mkString("\n")
res22: String =
../Data/hdfsSavePartition/ds=20190101
../Data/hdfsSavePartition/ds=20190102
../Data/hdfsSavePartition/ds=20191231
partitionPathArray1.map(x=>HdfsCheckPath.isPathExist(spark,x))
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!res23: Array[Boolean] = Array(true, true, false)
spark.read.option("basePath", "../Data/hdfsSavePartition")
.parquet(partitionPathArray1.filter(x=>HdfsCheckPath.isPathExist(spark,x)):_*).show()
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!
This path(../Data/hdfsSavePartition/ds=20190102) Already exist!
This path(../Data/hdfsSavePartition/ds=20191231) don't exist!
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
删除文件
删除与文件本身无关,是对整个目录的删除操作
删除非分区数据
第二个参数表示递归删除,即删除当前路径以及其子文件和子文件夹
路径不存在时,返回false
hdfs.delete(new Path("../Data/hdfsSaveNoPartition555/"),true)
res25: Boolean = false
路径存在返回true
hdfs.delete(new Path("../Data/hdfsSaveNoPartition/"),true)
res26: Boolean = true
删除分区数据
删除分区的逻辑比较简单,即把需删除分区的完整路径循环删除即可
查看分区数据
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除指定某个分区
hdfs.delete(new Path("../Data/hdfsSavePartition/ds=20190101"),true)
res28: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除多个分区
val partitionPathArray2 = Array("20190102","20200101").map(x=>"../Data/hdfsSavePartition"+"/"+x)
partitionPathArray2: Array[String] = Array(../Data/hdfsSavePartition/20190102, ../Data/hdfsSavePartition/20200101)
partitionPathArray2.mkString("\n")
res30: String =
../Data/hdfsSavePartition/20190102
../Data/hdfsSavePartition/20200101
partitionPathArray.foreach(x=>hdfs.delete(new Path(x),true))
查看当前目录,上述两个分区已经被删除
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition/","total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
如上所示,如果有的分区不存在,程序并不报错,可以在代码里加上print语句,作为提示。
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res33: Boolean = true
封装
object封装
目的:
- 封装常用的hdfs操作,方便调用
- object方式组织,方便函数的内部调用
- obejct scala2Hdfs
object scala2Hdfs {import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession,Dataset}/*** 获取文件系统对象** @param spark* @return*/def getFileSystem(spark: SparkSession): FileSystem = {val hadoopConf = spark.sparkContext.hadoopConfigurationFileSystem.get(hadoopConf)}/**** @param spark SparkSession* @param path 字符串格式,指定的路径*/def isPathExist(spark: SparkSession, path: String) = {// 获取hdfs配置信息val hdfs = getFileSystem(spark)// 设置输入的路径val outputPath = new Path(path)hdfs.exists(outputPath)// if (hdfs.exists(outputPath)) {// println("This path Already exist!")// } else {// println("This path don't exist!")// }}/*** 判断文件夹是否存在,以及是否为文件夹** @param spark SparkSession* @param path 字符串格式,指定的路径* @return*/def isExistsDirectory(spark: SparkSession, path: String): Boolean = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)if(!(hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory)){println(s"This path(${path}) don't exist!")}hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory}/**** @param spark SparkSession* @param path 字符串格式,指定的路径* @return*/def isExistsFile(spark: SparkSession, path: String): Boolean = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isFile}/**** @param spark SparkSession* @param path 对应路径* @param printlevel 打印的级别,枚举:directory、file、total*/def printPathDetail(spark: SparkSession, path: String, printlevel: String = "total"): Unit = {val hdfs = getFileSystem(spark)val isExists = isPathExist(spark, path)// 路径不存在无需继续,上一步中直接print出信息if (isExists) {println("This path Already exist!")// 如果路径存在,打印出对应的一级目录和文件名val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path)))if (printlevel == "directory") {println("-----Directory:")allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "file") {println("-----File:")allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println) //打印对应一级目录名} else if (printlevel == "total") {println("-----Total:")allFiles.foreach(println)}} else {println("This path don't exist!")}}/**** @param spark SparkSession* @param path 对应路径* path存在,检查是否有子文件夹,有break,没有delete*/def deleteSinglePath(spark: SparkSession, path: String): Unit = {val hdfs = getFileSystem(spark)val outputPath = new Path(path)// 先判断是否有对应路径,没有直接写,有删除if (hdfs.exists(outputPath)) {// 判断对应路径下是否有子文件夹val isDirectoryExists = hdfs.listStatus(outputPath).exists(_.isDirectory)// 没有子文件夹,删除数据if (!isDirectoryExists) {// 递归删除hdfs.delete(outputPath, true)println("Clean this path: " + path)} else {// 如果有子文件,防止错删,跳过println("Contains sub path, Skip clean")}} else {println(s"This path(${path}) don't exist!")}}/**** @param spark SparkSession* @param path 对应路径,要求最后不用带/* @param partitionsArray 对应分区列表,格式和表中实际分区字段一致* @param isPartition 是否分区数据*/def deletePartitionPath(spark: SparkSession, path: String, partitionsArray: Array[String], isPartition: Boolean = true): Unit = {if (!isPartition) {// 如果非分区,直接删除deleteSinglePath(spark, path)} else {if (partitionsArray.length == 0) {println("partitionsArray has no items")} else {// 拼接分区pathval partitionPathArray = partitionsArray.map(x => path + "/" + s"ds=${x}")// 循环删除分区,如果路径不存在,报错partitionPathArray.foreach(x => deleteSinglePath(spark, x))}}}/*** 存储** @param df 存储的df* @param path 路径* @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚* @param saveType 存储类型* @param saveMode 模式*/def saveSinglePath(df: DataFrame, path: String, coalesceNum: Int = 0, saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = {var tempDf = dfif (coalesceNum >= 1) {tempDf = df.coalesce(coalesceNum)}val write = tempDf.write.mode(saveMode)saveType match {case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)case "parquet" => write.parquet(path);println("Save this path: "+ path)case _ => println(s"Not Support this savetype:${saveType}")}}/**** @param dataFrameWithDs 待分区字段的dataframe* @param path 储存的path* @param saveMode 保存方式,默认为append* @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚* @param partitionColNames 分区列名,array形式* 得到分区列数据,存储即可*/def savePartitionPath(dataFrameWithDs: DataFrame, path: String, saveType: String = "parquet", saveMode: String = "append", coalesceNum: Int = 0, partitionColNames: Array[String] = Array("ds")): Unit = {var tempDf = dataFrameWithDsif (coalesceNum >= 1) {tempDf = dataFrameWithDs.coalesce(coalesceNum)}val write = tempDf.write.partitionBy(partitionColNames: _*).mode(saveMode)saveType match {case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path)case _ => write.parquet(path);println("Save this path: "+ path)}}/*** 清空并保存** @param df 存储的df* @param path 路径* @param coalesceNum 合并后分区数* @param saveType 存储类型* @param saveMode 模式*/def cleanAndSaveSinglePath(spark: SparkSession, df: DataFrame, path: String, coalesceNum: Int = 0, saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = {// 先删除对应路径deleteSinglePath(spark, path)// 再保存数据saveSinglePath(df, path, coalesceNum, saveType, saveMode)}/**** @param dataFrameWithDs 存储的df* @param path 路径* @param partitionsArray 分区list* @param coalesceNum 合并后分区数* @param partitionColNames 分区列*/def cleanAndSavePartitionPath(dataFrameWithDs: DataFrame, path: String, saveMode: String = "append", partitionsArray: Array[String], coalesceNum: Int = 0, partitionColNames: Array[String] = Array("ds")): Unit = {val spark = dataFrameWithDs.sparkSession// 先删除对应分区的数据deletePartitionPath(spark, path, partitionsArray)// 保存对应分区的数据savePartitionPath(dataFrameWithDs = dataFrameWithDs, path = path,saveMode=saveMode, partitionColNames = partitionColNames)}def readSinglePath(spark: SparkSession, path: String): DataFrame = {if (isExistsDirectory (spark, path) ) {spark.read.parquet(path)}else{println ("This path don't exist!")// 返回个空数据框,不知道有没有别的方式!!spark.emptyDataFrame}
}/**** @param spark SparkSession* @param path 路径* @param readType 文件类型,暂只支持parquet* @param partitionsArray 所读的分区* @return*/def readPartitionPath(spark: SparkSession, path: String, readType: String = "parquet", partitionsArray: Array[String])={if(readType!="parquet"){println(s"Not Support this readType:${readType}")spark.emptyDataFrame}else{if(partitionsArray.length==0){println("PartitionsArray is null")spark.emptyDataFrame}else{val partitionPathArray = partitionsArray.map(x=>path +"/"+ s"ds=${x}")//过滤掉不存在的分区目录spark.read.option("basePath", path).parquet(partitionPathArray.filter(x=> isExistsDirectory(spark, x)):_*)}}
}
}
defined object scala2Hdfs
查看操作
查看路径是否存在,是否为文件、为目录
var path ="../Data"
path: String = ../Data
scala2Hdfs.isExistsDirectory(spark,path)
res34: Boolean = true
scala2Hdfs.isPathExist(spark,path)
res35: Boolean = true
scala2Hdfs.isExistsFile(spark,path)
res36: Boolean = false
scala2Hdfs.isExistsFile(spark,"../Data/test.txt")
res37: Boolean = true
scala2Hdfs.isPathExist(spark,"../Data/test.txt")
res38: Boolean = true
打印路径信息
scala2Hdfs.printPathDetail(spark,path,"total")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
scala2Hdfs.printPathDetail(spark,path,"directory")
This path Already exist!
-----Directory:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
scala2Hdfs.printPathDetail(spark,path,"file")
This path Already exist!
-----File:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
保存操作
非分区保存
saveMode="errorifexists"相当于直接新建,再保存。如果路径存在,则会报错。
此时可以采用覆盖写入的方式或者用saveSinglePath方法,先删除再写入
scala2Hdfs.saveSinglePath(df=df1,path="../Data/hdfsSaveNoPartition",saveMode="errorifexists")
Save this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.cleanAndSaveSinglePath(spark=spark,df=df1,path="../Data/hdfsSaveNoPartition",saveMode="overwrite")
Clean this path: ../Data/hdfsSaveNoPartition
Save this path: ../Data/hdfsSaveNoPartition
错误的储存格式
scala2Hdfs.saveSinglePath(df=df,path="../Data/hdfsSaveTest",saveMode="append",saveType="dd")
Not Support this savetype:dd
分区储存
scala2Hdfs.savePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionColNames=Array("ds"))
Save this path: ../Data/hdfsSavePartition
查看分区保存的结果
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
cleanAndSavePartitionPath,可以先对分区删除,再保存。此时,非20190101分区会有重复数据
常规的应用场景,都是往新分区里查数据,可具体问题具体分析
scala2Hdfs.cleanAndSavePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101"),partitionColNames=Array("ds"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Save this path: ../Data/hdfsSavePartition
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition/ds=20190101")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101/part-00000-5771f2d8-7713-4fc3-9b5d-06df065bd3b8.c000.snappy.parquet
读取操作
非分区数据读取
spark.read.parquet("../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
scala2Hdfs.readSinglePath(spark, "../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+
| id| sex|age| createtime_str|
+---+------+---+-------------------+
| 2|female| 37|2019-01-02 11:55:50|
| 4|female| 44|2019-02-01 12:45:50|
| 1| male| 18|2019-01-01 11:45:50|
| 3| male| 21|2019-01-21 11:45:50|
| 5| male| 39|2019-01-15 10:40:50|
+---+------+---+-------------------+
读取分区数据
scala2Hdfs.readPartitionPath(spark=spark, path="../Data/hdfsSavePartition",partitionsArray=Array("20191101","20190101","20190102")).show()
This path(../Data/hdfsSavePartition/ds=20191101) don't exist!
+---+------+---+-------------------+--------+
| id| sex|age| createtime_str| ds|
+---+------+---+-------------------+--------+
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 2|female| 37|2019-01-02 11:55:50|20190102|
| 1| male| 18|2019-01-01 11:45:50|20190101|
+---+------+---+-------------------+--------+
删除操作
删除操作错误路径
scala2Hdfs.deleteSinglePath(spark,"../data1")
This path(../data1) don't exist!
删除非分区数据
scala2Hdfs.deleteSinglePath(spark,"../Data/hdfsSaveNoPartition")
Clean this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSaveNoPartition")
This path don't exist!
删除分区数据
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
scala2Hdfs.deletePartitionPath(spark=spark,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101","20190102","20200101"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101
Clean this path: ../Data/hdfsSavePartition/ds=20190102
This path(../Data/hdfsSavePartition/ds=20200101) don't exist!
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
清空所有数据
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res58: Boolean = true
scala2Hdfs.printPathDetail(spark,"../Data")
This path Already exist!
-----Total:
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
2020-01-02 于南京江宁区九龙湖
Scala102-操作Hdfs相关推荐
- hadoop java操作hdfs
hfds 是一种文件系统,用于存储hadoop将要处理的数据.适用于大规模分布式数据处理,是一个可扩展行的文件分布式系统: 优点 1.如果出现节点宕机,hdfs,可以持续监视,错误检查,容错处理,文档 ...
- python 新建文件 hdfs_大数据学习(六):Python操作hdfs(包括追加数据文件到hdfs文件)...
#!coding:utf-8 import sys from hdfs.client import Client #设置utf-8模式 reload(sys) sys.setdefaultencodi ...
- python操作hdfs_python 操作hdfs
from hdfs.client importClient#关于python操作hdfs的API可以查看官网:#https://hdfscli.readthedocs.io/en/latest/api ...
- hdfs user 连接_Python入门操作HDFS
点击上方蓝色字体,关注我们 读完需 7 分钟 速读需 3 分钟 在了解了Hadoop的基本使用后,需要通过编程语言进一步操作.对于没有Java基础的运维人,Python如何操作hdfs也就成了我们优先 ...
- Windows下使用Java API操作HDFS的常用方法
场景 Windows下配置Hadoop的Java开发环境以及用Java API操作HDFS: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/det ...
- Windows下配置Hadoop的Java开发环境以及用Java API操作HDFS
场景 HDFS的访问方式之HDFS shell的常用命令: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119351218 在上 ...
- Java操作HDFS文件系统
对于操作HDFS文件系统,需要有一个入口,对于Hadoop来说,编程入口就是FileSystem. 例如我们要使用API创建一个文件夹: /*** @author vincent* @time 201 ...
- 通过java api操作hdfs(kerberos认证)
参考代码如下 import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs ...
- Java 操作 HDFS
HDFS 作为开源界比较成熟的分布式文件存储系统,适用于海量文件存储,本文介绍了如何使用 Java 操作 HDFS,采用 Maven 管理包. pom.xml <dependency>&l ...
- java向hdfs提交命令_Java语言操作HDFS常用命令测试代码
本文主要向大家介绍了Java语言操作HDFS常用命令测试代码,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助. package com.yxc.hdfs; import org.apac ...
最新文章
- msclass 文字滚动_文字无缝循环滚动(标题向上滚动)
- 【ASP.NET】 【防止连续多次点击提交按钮 导致页面重复提交】
- 从统计局抓取2016年最新的全国区县数据!!
- json对象数组按对象属性排序
- 22届腾讯暑期实习三轮面试面经(已oc)
- Linux多线程——使用信号量同步线程
- mysql proxy 读写分离_mysql-proxy 实现读写分离
- vue中v-model的使用
- Tomcat报错:ERROR:transport error 202: gethostbyname: unknown host
- mybatis逆向工程利用mybatis-generator-core自动生成代码
- 进阶系列(11)—— C#多线程
- python 根据条件输出_python数据类型、输入输出、运算符、条件判断、循环
- 利用Travis IC实现Hexo博客自动化部署
- 使用stream报错:stream has already been operated upon or closed
- 数据分析python面试题_10道Python常见面试题
- nginx的下载与安装
- java与javax有什么区别?
- 单片机奇偶交替闪烁_单片机控制继电器 使LED灯交替闪烁
- svg 内部元素scale 缩放不在原位置解决办法
- linux aarch JBR with JCEF
热门文章
- Mysql8 CentOS7 Compressed TAR Archive安装
- 李丰杰老师 实战生产管理专家
- 手机一键还原数据还能恢复吗
- 在google应用商店下载的Vue.js Devtools在控制台(开发者模式)没有vue选项
- 微信小程序小说阅读器+后台管理系统|前后分离VUE
- s8 android调用相机,改善画质无需硬件:当Galaxy S8遇上谷歌Pixel相机
- 多视角3D重建技术市场现状研究分析报告-
- php面试题之一,PHP的面试题集
- getUserMedia录制视频有噪音解决方法
- 如何将MacBook Pro投屏到电视上(相同局域网、免安装)(1分钟)