Rdd的缓存有两种意义上的缓存。

当在SparkContext中常创建输入流的时候,将会注册一个InputDStream流到DStreamGraph当中。

当对该流进行transform操作,比如map,flatmap等操作的时候,将会以一开始的InputDStream生成MappedDStream和FlatMappedDStream。

在所有stream的超类DStream实现了map()方法。

def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {new MappedDStream(this, context.sparkContext.clean(mapFunc))
}class MappedDStream[T: ClassTag, U: ClassTag] (parent: DStream[T],mapFunc: T => U) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.map[U](mapFunc))}
}

从MappedDStream的初始化方法可以看到,被调用的DStream将会被作为parent记录到MappedDStream当中,并作为dependencies的一员记录,当该MappedDStream的compute()方法被调用的时候,将会首先调用parent的getOrCompute()方法。

回到DStream流的实现,当不断延伸DStream的处理流程,当到输出流的时候,将会作为OutputDStream向DStreamGraph。

private[streaming] def register(): DStream[T] = {ssc.graph.addOutputStream(this)this
}

比如print()操作,将会生成一个ForEachDStream并调用register()方法向DStreamGraph注册成为一个OutputDStream。

当然在生成ForEachDStream也记录了上游操作的DStream作为parent。

回到DStreamGraph生成job时候的generateJobs()方法。

def generateJobs(time: Time): Seq[Job] = {logDebug("Generating jobs for time " + time)val jobs = this.synchronized {outputStreams.flatMap { outputStream =>val jobOption = outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated " + jobs.length + " jobs for time " + time)jobs
}

DStreamGraph的generateJobs()实则是遍历所有的OutputDStream去实现其generateJob()方法,在这里,将会从输出流OutputDStream开始,不断从其parent开始逐级往上调用compute()方法,直到到最初的输入流InputDStream正式定义rdd为止。

在这个过程中,DStream维护了一个generatedRDDs,当一个上游的DStream已经被调用过compute()生成该时间对应的rdd之后,将会缓存在这个集合中,直接进行返回。

上述是第一种缓存,当rdd在DStream中被定义时候的缓存。

另一种,是rdd在executor具体进行计算时候对于中间结果的缓存。

在流的定义过程中,可以显示调用cache()方法。

def persist(level: StorageLevel): DStream[T] = {if (this.isInitialized) {throw new UnsupportedOperationException("Cannot change storage level of a DStream after streaming context has started")}this.storageLevel = levelthis
}/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()

其实cache()方法只是简单的改变了DStream中的储存级别。

真正的缓存步骤在上述的DStream的compute()方法中,在此处,将会根据DStream存储级别判断,如果是调用过上述cache()方法的存储级别,将会调用RDD的persist()方法。

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// TODO: Handle changes of StorageLevelif (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// If this is the first time this RDD is marked for persisting, register it// with the SparkContext for cleanups and accounting. Do this only once.if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))sc.persistRDD(this)}storageLevel = newLevelthis
}

在这里,将会在SparkContext中,将该rdd纳入到缓存范围内。

再回到当任务的执行流程还在DAGSchdeuler当中时,当准备向executor具体拆分成task的时候,将会调用getPreferredLocs()方法,来定位做合适的executor位置。

if (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we don't need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil)} else {val blockIds =rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms =>bms.map(bm => TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) = locs
}

在这个方法中,如果上游采用了窄依赖,将会根据rdd及对应分区得到对应的RDDBlockId,从各个BlockManager寻址得到缓存了上游rdd的位置,优先作为目标调度。

当Rdd在executor上具体进行处理的时候,将会调用iterator()方法返回对应分区下该rdd的所有数据,此处在这里调用了其getOrCompute()方法。

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) =>if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}
}

在这里,尝试从BlockManager中获取该RDD分区下的缓存数据,避免再从头将数据计算一遍,如果本地缓存中找不到对应的缓存数据,将会从头开始对rdd进行计算,并缓存,以便下游算子可以在缓存中获取到。如果在上述的调度过程中,将该task直接调度到了上游rdd缓存所在的executor,将可以直接从本地的缓存中读取,完成了rdd处理结果的高效缓存。

spark 上游rdd的缓存相关推荐

  1. Spark RDD的缓存

    Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集.当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出的RDD进行的其他动作中重用.这使得 ...

  2. Spark之RDD理论篇

    Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...

  3. Spark:RDD编程总结(概述、算子、分区、共享变量)

    目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...

  4. 什么是RDD?带你快速了解Spark中RDD的概念!

    看了前面的几篇Spark博客,相信大家对于Spark的基本概念以及不同模式下的环境部署问题已经搞明白了.但其中,我们曾提到过Spark程序的核心,也就是弹性分布式数据集(RDD).但到底什么是RDD, ...

  5. Spark的RDD持久化

    RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...

  6. Spark的RDD依赖关系

    RDD依赖关系 RDD 血缘关系 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作.将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD 的Lineage 会记录R ...

  7. Spark之RDD实战篇

    Spark RDD创建.转换.行动算子.RDD的持久化: RDD编程 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换.经过一系列的transformations定义RDD之 ...

  8. spark输出rdd数据_Spark中RDD的详解

    (collect 收集 //将rdd分布式存储在集群中不同分区的数据 获取到一起 组成一个数组返回 //要注意 这个方法将会把所有数据搞到一个机器内 容易造成内存的溢出 在生产环境下千万慎用 rdd. ...

  9. spark数据处理-RDD

    文章目录 spark数据处理笔记 spark核心介绍 RDD编程 RDD介绍-弹性分布式数据集 创建RDD两种方式 函数传递 常见RDD转化操作和行动操作 常用的转化操作: 类集合操作 行动操作 不同 ...

最新文章

  1. 无法访问linux mysql_远程无法访问linux Mysql解决方案(转)
  2. python 杂记(二)
  3. [ JavaScript ] 数据结构与算法 —— 链表
  4. JAVA怎么实现网页退出系统_java后台实现js关闭本页面,父页面指定跳转或刷新操作...
  5. 【原创】tarjan算法初步(强连通子图缩点)
  6. scut协议配置工具初始化的一些问题
  7. java生成txt_Java ThreadDump 生成解析
  8. VScode下载安装及使用教程
  9. 线性代数知识点整理(自用)
  10. 计算机应用基础周记,2800字计算机应用基础实习报告范文.doc
  11. html图片轮播效果加链接,HTML首页怎么加图片轮播?
  12. 系统设计题面试八股文背诵版
  13. Excel根据手机号区分运营商
  14. 黑苹果音频卡顿_手机耗电大、卡顿怎么办?只需关掉这个按键轻松解决,去试试...
  15. C语言rot90的头文件,python – np.rot90()破坏了opencv图像
  16. 浏览器输入www.baidu.com之后,发生了什么?
  17. 漫谈深度学习时代点击率预估技术进展
  18. 4.1 Member 的各种调用方式
  19. 20万、50万、100万年薪的算法工程师能力上有哪些差距?
  20. 反激式开关电源芯片是什么?如何对反激开关电源mos管选型?

热门文章

  1. 删除以x为根节点的子树并释放☆
  2. Windows下Maven 环境配置
  3. 诗与远方:无题(八十七)
  4. django之Layui界面点击弹出个对话框并请求逻辑生成分页的动态表格
  5. Hbase单点安装Version1.1.5
  6. Java中常见的排序算法代码演示
  7. 睡前小故事之Html
  8. java 中的 Math.round(-1.5) 等于多少?
  9. java ee junit_JavaEE——Junit
  10. c++使用单向链表存储一组有序数据_初试攻略丨计算机考研中数据结构知识点总结,硬核!...