文章目录

  • 1. job1产生时机源码分析
    • 1. DataSoure.getOrInferFileFormatSchema()
    • 2. ParquetFileFormat.inferSchema
      • 1. 简化后代码
      • 2. parquetOptions.mergeSchema 为false
      • 3. isParquetSchemaRespectSummaries 默认值为false
      • 4. filesByType 信息
    • 3. ParquetFileFormat.mergeSchemasInParallel
  • 2. job1的调用总结
  • 3. spark read 总结

本篇接 上一篇,这一篇主要是针对job1的源码分析

1. job1产生时机源码分析

job1是在job0执行完在driver端往下执行的的时候发生的,所以先来复习一下job0的过程。


DataFrameReader.load()
DataFrameReader.loadV1Source()
DataSoure.resolveRelation()
DataSource.getOrInferFileFormatSchema()
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
InMemoryFileIndex.refresh0()
InMemoryFileIndex.listLeafFiles()
InMemoryFileIndex.bulkListLeafFiles()

job1产生的调用是在DataSoure.getOrInferFileFormatSchema()方法中。
再贴一遍这个方法

1. DataSoure.getOrInferFileFormatSchema()

private def getOrInferFileFormatSchema(format: FileFormat,fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {// the operations below are expensive therefore try not to do them if we don't need to, e.g.,// in streaming mode, we have already inferred and registered partition columns, we will// never have to materialize the lazy val below// job0 这里定义的是lazy变量,最终使用的时候才会初始化lazy val tempFileIndex = {val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.toSeq.flatMap { path =>val hdfsPath = new Path(path)val fs = hdfsPath.getFileSystem(hadoopConf)val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)}.toArray//job0 这个地方初始化了InMemoryFileIndex 对象,也就是在这里形成了第一个jobnew InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)}val partitionSchema = if (partitionColumns.isEmpty) {// Try to infer partitioning, because no DataSource in the read path provides the partitioning// columns properly unless it is a Hive DataSource// job0 在这里第一次真正使用lazy的tempFileIndex变量,也就促使了InMemoryFileIndex 的初始化。combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)} else {// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred// partitioningif (userSpecifiedSchema.isEmpty) {val inferredPartitions = tempFileIndex.partitionSchemainferredPartitions} else {val partitionFields = partitionColumns.map { partitionColumn =>userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {val inferredPartitions = tempFileIndex.partitionSchemaval inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))if (inferredOpt.isDefined) {logDebug(s"""Type of partition column: $partitionColumn not found in specified schema|for $format.|User Specified Schema|=====================|${userSpecifiedSchema.orNull}||Falling back to inferred dataType if it exists.""".stripMargin)}inferredOpt}.getOrElse {throw new AnalysisException(s"Failed to resolve the schema for $format for " +s"the partition column: $partitionColumn. It must be specified manually.")}}StructType(partitionFields)}}// 到这里job0已经执行完并且返回回来了val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {// job1 始于这里的调用format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}// We just print a waring message if the data schema and partition schema have the duplicate// columns. This is because we allow users to do so in the previous Spark releases and// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).// See SPARK-18108 and SPARK-21144 for related discussions.try {SchemaUtils.checkColumnNameDuplication((dataSchema ++ partitionSchema).map(_.name),"in the data schema and the partition schema",equality)} catch {case e: AnalysisException => logWarning(e.getMessage)}(dataSchema, partitionSchema)}

在这个方法中会先触发job0,在job0返回后会走下面的步骤,job0主要是文件分析,收集总共有多少个文件,每个文件的block信息等。
job1的主要功能是做data-schema的提取,job1的触发在这一段,下面的代码会进入到orElse{}部分

    // 到这里job0已经执行完并且返回回来了val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {// job1 始于这里的调用format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}

对应的format的类型是ParquetFileFormat,所以就是调用了ParquetFileFormat.inferSchema()

2. ParquetFileFormat.inferSchema

  /*** When possible, this method should return the schema of the given `files`.  When the format* does not support inference, or no valid files are given should return None.  In these cases* Spark will require that user specify the schema manually.*/override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)// Should we merge schemas from all Parquet part-files?val shouldMergeSchemas = parquetOptions.mergeSchemaval mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummariesval filesByType = splitFiles(files)// Sees which file(s) we need to touch in order to figure out the schema.//// Always tries the summary files first if users don't require a merged schema.  In this case,// "_common_metadata" is more preferable than "_metadata" because it doesn't contain row// groups information, and could be much smaller for large Parquet files with lots of row// groups.  If no summary file is available, falls back to some random part-file.//// NOTE: Metadata stored in the summary files are merged from all part-files.  However, for// user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know// how to merge them correctly if some key is associated with different values in different// part-files.  When this happens, Parquet simply gives up generating the summary file.  This// implies that if a summary file presents, then:////   1. Either all part-files have exactly the same Spark SQL schema, or//   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus//      their schemas may differ from each other).//// Here we tend to be pessimistic and take the second case into account.  Basically this means// we can't trust the summary files if users require a merged schema, and must touch all part-// files to do the merge.val filesToTouch =if (shouldMergeSchemas) {// Also includes summary files, 'cause there might be empty partition directories.// If mergeRespectSummaries config is true, we assume that all part-files are the same for// their schema with summary files, so we ignore them when merging schema.// If the config is disabled, which is the default setting, we merge all part-files.// In this mode, we only need to merge schemas contained in all those summary files.// You should enable this configuration only if you are very sure that for the parquet// part-files to read there are corresponding summary files containing correct schema.// As filed in SPARK-11500, the order of files to touch is a matter, which might affect// the ordering of the output columns. There are several things to mention here.////  1. If mergeRespectSummaries config is false, then it merges schemas by reducing from//     the first part-file so that the columns of the lexicographically first file show//     first.////  2. If mergeRespectSummaries config is true, then there should be, at least,//     "_metadata"s for all given files, so that we can ensure the columns of//     the lexicographically first file show first.////  3. If shouldMergeSchemas is false, but when multiple files are given, there is//     no guarantee of the output order, since there might not be a summary file for the//     lexicographically first file, which ends up putting ahead the columns of//     the other files. However, this should be okay since not enabling//     shouldMergeSchemas means (assumes) all the files have the same schemas.val needMerged: Seq[FileStatus] =if (mergeRespectSummaries) {Seq.empty} else {filesByType.data}needMerged ++ filesByType.metadata ++ filesByType.commonMetadata} else {// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet// don't have this.filesByType.commonMetadata.headOption// Falls back to "_metadata".orElse(filesByType.metadata.headOption)// Summary file(s) not found, the Parquet file is either corrupted, or different part-// files contain conflicting user defined metadata (two or more values are associated// with a same key in different files).  In either case, we fall back to any of the// first part-file, and just assume all schemas are consistent..orElse(filesByType.data.headOption).toSeq}ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)}

这个方法看着很长,其实注释很多,主要就是判断是否需要读取每个parquet文件的schema来进行schema的判断,以及具体的判断方式
把代码简化一下。

1. 简化后代码

override def inferSchema(sparkSession: SparkSession,parameters: Map[String, String],files: Seq[FileStatus]): Option[StructType] = {val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)// 是否要从所有的parquet的part-files中merge schema, 这个值是默认是falseval shouldMergeSchemas = parquetOptions.mergeSchema// merge的话,是否所有的parquet文件的summary信息都是一致的,默认是false,就是不一致,这个变量只有在shouldMergeSchemas 为true的时候才会用上val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummariesval filesByType = splitFiles(files)val filesToTouch =// 这里是falseif (shouldMergeSchemas) {val needMerged: Seq[FileStatus] =if (mergeRespectSummaries) {Seq.empty} else {filesByType.data}needMerged ++ filesByType.metadata ++ filesByType.commonMetadata} else {会走到这里来filesByType.commonMetadata.headOption.orElse(filesByType.metadata.headOption).orElse(filesByType.data.headOption).toSeq}最终调用这里来ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)}

2. parquetOptions.mergeSchema 为false

parquetOptions.mergeSchema对应的默认值是false

  val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema").doc("When true, the Parquet data source merges schemas collected from all data files, " +"otherwise the schema is picked from the summary file or a random data file " +"if no summary file is available.").booleanConf.createWithDefault(false)

3. isParquetSchemaRespectSummaries 默认值为false

sparkSession.sessionState.conf.isParquetSchemaRespectSummaries默认的值是false

val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles").doc("When true, we make assumption that all part-files of Parquet are consistent with " +"summary files and we will ignore them when merging schema. Otherwise, if this is " +"false, which is the default, we will merge all part-files. This should be considered " +"as expert-only option, and shouldn't be enabled before knowing what it means exactly.").booleanConf.createWithDefault(false)

4. filesByType 信息

filesByType 是所有文件的一个封装,最后生成的filesToTouch 的内容是

LocatedFileStatus{path=hdfs://test.com:9000/user/daily/20200828/part-00000-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet; isDirectory=false; length=193154555; replication=3; blocksize=134217728; modification_time=1599052749676; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}

也就是filesByType所有文件中的第一个,然后使用这个文件作为参数来调用ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)方法

3. ParquetFileFormat.mergeSchemasInParallel

/*** Figures out a merged Parquet schema with a distributed Spark job.** Note that locality is not taken into consideration here because:**  1. For a single Parquet part-file, in most cases the footer only resides in the last block of*     that file.  Thus we only need to retrieve the location of the last block.  However, Hadoop*     `FileSystem` only provides API to retrieve locations of all blocks, which can be*     potentially expensive.**  2. This optimization is mainly useful for S3, where file metadata operations can be pretty*     slow.  And basically locality is not available when using S3 (you can't run computation on*     S3 nodes).*/def mergeSchemasInParallel(filesToTouch: Seq[FileStatus],sparkSession: SparkSession): Option[StructType] = {val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsStringval assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestampval serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())// !! HACK ALERT !!//// Parquet requires `FileStatus`es to read footers.  Here we try to send cached `FileStatus`es// to executor side to avoid fetching them again.  However, `FileStatus` is not `Serializable`// but only `Writable`.  What makes it worse, for some reason, `FileStatus` doesn't play well// with `SerializableWritable[T]` and always causes a weird `IllegalStateException`.  These// facts virtually prevents us to serialize `FileStatus`es.//// Since Parquet only relies on path and length information of those `FileStatus`es to read// footers, here we just extract them (which can be easily serialized), send them to executor// side, and resemble fake `FileStatus`es there.val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))// Set the number of partitions to prevent following schema reads from generating many tasks// in case of a small number of parquet files.val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),sparkSession.sparkContext.defaultParallelism)val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles// Issues a Spark job to read Parquet schema in parallel.val partiallyMergedSchemas =sparkSession.sparkContext.parallelize(partialFileStatusInfo, numParallelism).mapPartitions { iterator =>// Resembles fake `FileStatus`es with serialized path and length information.val fakeFileStatuses = iterator.map { case (path, length) =>new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))}.toSeq// Reads footers in multi-threaded manner within each taskval footers =ParquetFileFormat.readParquetFootersInParallel(serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`val converter = new ParquetToSparkSchemaConverter(assumeBinaryIsString = assumeBinaryIsString,assumeInt96IsTimestamp = assumeInt96IsTimestamp)if (footers.isEmpty) {Iterator.empty} else {var mergedSchema = ParquetFileFormat.readSchemaFromFooter(footers.head, converter)footers.tail.foreach { footer =>val schema = ParquetFileFormat.readSchemaFromFooter(footer, converter)try {mergedSchema = mergedSchema.merge(schema)} catch { case cause: SparkException =>throw new SparkException(s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)}}Iterator.single(mergedSchema)}}.collect()if (partiallyMergedSchemas.isEmpty) {None} else {var finalSchema = partiallyMergedSchemas.headpartiallyMergedSchemas.tail.foreach { schema =>try {finalSchema = finalSchema.merge(schema)} catch { case cause: SparkException =>throw new SparkException(s"Failed merging schema:\n${schema.treeString}", cause)}}Some(finalSchema)}}

ParquetFileFormat.mergeSchemasInParallel有两个参数,一个是sparksession,另一个是filesToTouch,存储的是一个文件列表信息,上面也提到默认情况下实际上传进来的只有一个文件。
这个方法的作用就是创建一个job1,根据传进来的文件来解析出来schema信息,对应的job1的内容有:

    val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen))val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),sparkSession.sparkContext.defaultParallelism)val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFilesval partiallyMergedSchemas =sparkSession.sparkContext.parallelize(partialFileStatusInfo, numParallelism).mapPartitions { iterator =>......Iterator.single(mergedSchema)}.collect()

job1的并行度numParallelism

    val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),sparkSession.sparkContext.defaultParallelism)

partialFileStatusInfo.size是传进来的参数,这里是1
sparkSession.sparkContext.defaultParallelism返回的是1,实际上这个在不同的环境下值是不同的,但是肯定不会小于1

所以 numParallelism 的值就是1,也就是job1的partition数量是1

2. job1的调用总结

方法的调用链是


DataFrameReader.load()
DataFrameReader.loadV1Source()
DataSoure.resolveRelation()
DataSource.getOrInferFileFormatSchema()
ParquetFileFormat.inferSchema()
ParquetFileFormat.mergeSchemasInParallel()

3. spark read 总结

再回顾一下我们的代码。

public class UserProfileTest {static String filePath = "hdfs:///user/daily/20200828/*.parquet";public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("user_profile_test").set(ConfigurationOptions.ES_NODES, "").set(ConfigurationOptions.ES_PORT, "").set(ConfigurationOptions.ES_MAPPING_ID, "uid");//主要想要考察一下这个地方为什么会产生更多的jobSparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);userProfileSource.count();userProfileSource.write().parquet("hdfs:///user/daily/result2020082808/");}
}

当filePath命中的path数量大于32的话便会产生一个单独的job来进行文件的递归查找,找到所有复合条件的file信息(包括block size等信息,为后面的schema识别和rdd操作的partition做准备)
在找到所有的文件之后对应的读取文件如果是parquet的话会创建一个新的job来解析对应的schema。

基于上面的解析,假如我们对上面代码中的filepath进行修改,假如设置filePath

static String filePath = "hdfs://bj3-dev-search-01.tencn:9000/user/daily/20200828/part-00057-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet";

那么程序运行的时候就没有job0了。

spark读取文件源码分析-2相关推荐

  1. spark读取文件源码分析-1

    文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...

  2. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  3. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  4. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  5. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  6. springboot自动配置文件读取以及源码分析

    今天来讲讲springboot自动配置文件读取以及源码分析 springboot启动之后 1.首先进入@springbootApplication(如上图) 里面的**@EnableAutoConfi ...

  7. php读取图片文件流,详解php文件包含原理(读取文件源码、图片马、各种协议、远程getshell等)...

    详解php文件包含原理(读取文件源码.图片马.各种协议.远程getshell等) 作者是namezz (看完图相当于做了一轮实验系列) 现有文件代码如下 1.png (21.16 KB, 下载次数: ...

  8. include详解 shell_详解php文件包含原理(读取文件源码、图片马、各种协议、远程getshell等) ......

    详解php文件包含原理(读取文件源码.图片马.各种协议.远程getshell等) 作者是namezz (看完图相当于做了一轮实验系列) 现有文件代码如下 include和include_once.re ...

  9. spark 2.3源码分析之SortShuffleWriter

    SortShuffleWriter 概述 SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作.如果需要聚合,则使用PartitionedAppendOnlyMa ...

最新文章

  1. 【java】itoo项目实战之hibernate 懒载入优化性能
  2. View工作原理(四)view的layout过程
  3. [导入]sqlserver2005 数据挖掘控件研究
  4. linux系统的学习经验首篇
  5. [kubernetes] 资源管理 --- 资源预留实践
  6. Relaltek声卡在UBUNTU下没有声音的解决方法。
  7. java集合数组,数组小到大排序,数组大到小排序
  8. 直击中关村创业大街,新街头霸王来了
  9. 服务实体经济、战略级行业再下一城,钉钉发布制造行业解决方案2.0
  10. 开发浏览器监控网页数据变化_贝程学院:Selenium辅助开发工具Firebug和Firepath
  11. 如何在linux查找虚拟机主机号_Linux主机名如何重命名?
  12. 基础总结篇之六:ContentProvider之读写联系人
  13. pyRedis - 操作指南:增/删/改/查、管道与发布订阅功能
  14. 那些开源程序中让人叹为观止的代码 - 1 浏览器特性判断
  15. IIS与Tomcat的区别
  16. PDMS Pipeline Tool 教程(一):安装说明
  17. canvas旋转跟随鼠标线条 html+css+js
  18. 基于粒子群算法的PID控制器优化设计
  19. JavaScript中定义结构体一维二维多维数组
  20. HDU 2079 选课时间 组合题

热门文章

  1. 【Boost】boost库中bind的用法
  2. 电信诈骗?一招让骗子血本无归!
  3. 计算机网络 | IP协议相关技术与网络总结 :DNS、ICMP、DHCP、NAT/NAPT、通信流程
  4. C++内存管理全景指南
  5. 每秒10W次高并发订单业务,你怎么实现?
  6. 意犹未尽 —— GPM 的状态流转(十)
  7. 泛广电领域的卫星传输和公网传输
  8. 音视频技术的高光时刻: LiveVideoStackCon 2019上海 音视频技术大会
  9. gitlab project项目迁移
  10. ngx_http_discard_request_body