spark 上游rdd的缓存
Rdd的缓存有两种意义上的缓存。
当在SparkContext中常创建输入流的时候,将会注册一个InputDStream流到DStreamGraph当中。
当对该流进行transform操作,比如map,flatmap等操作的时候,将会以一开始的InputDStream生成MappedDStream和FlatMappedDStream。
在所有stream的超类DStream实现了map()方法。
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {new MappedDStream(this, context.sparkContext.clean(mapFunc))
}class MappedDStream[T: ClassTag, U: ClassTag] (parent: DStream[T],mapFunc: T => U) extends DStream[U](parent.ssc) {override def dependencies: List[DStream[_]] = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {parent.getOrCompute(validTime).map(_.map[U](mapFunc))}
}
从MappedDStream的初始化方法可以看到,被调用的DStream将会被作为parent记录到MappedDStream当中,并作为dependencies的一员记录,当该MappedDStream的compute()方法被调用的时候,将会首先调用parent的getOrCompute()方法。
回到DStream流的实现,当不断延伸DStream的处理流程,当到输出流的时候,将会作为OutputDStream向DStreamGraph。
private[streaming] def register(): DStream[T] = {ssc.graph.addOutputStream(this)this
}
比如print()操作,将会生成一个ForEachDStream并调用register()方法向DStreamGraph注册成为一个OutputDStream。
当然在生成ForEachDStream也记录了上游操作的DStream作为parent。
回到DStreamGraph生成job时候的generateJobs()方法。
def generateJobs(time: Time): Seq[Job] = {logDebug("Generating jobs for time " + time)val jobs = this.synchronized {outputStreams.flatMap { outputStream =>val jobOption = outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated " + jobs.length + " jobs for time " + time)jobs
}
DStreamGraph的generateJobs()实则是遍历所有的OutputDStream去实现其generateJob()方法,在这里,将会从输出流OutputDStream开始,不断从其parent开始逐级往上调用compute()方法,直到到最初的输入流InputDStream正式定义rdd为止。
在这个过程中,DStream维护了一个generatedRDDs,当一个上游的DStream已经被调用过compute()生成该时间对应的rdd之后,将会缓存在这个集合中,直接进行返回。
上述是第一种缓存,当rdd在DStream中被定义时候的缓存。
另一种,是rdd在executor具体进行计算时候对于中间结果的缓存。
在流的定义过程中,可以显示调用cache()方法。
def persist(level: StorageLevel): DStream[T] = {if (this.isInitialized) {throw new UnsupportedOperationException("Cannot change storage level of a DStream after streaming context has started")}this.storageLevel = levelthis
}/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
其实cache()方法只是简单的改变了DStream中的储存级别。
真正的缓存步骤在上述的DStream的compute()方法中,在此处,将会根据DStream存储级别判断,如果是调用过上述cache()方法的存储级别,将会调用RDD的persist()方法。
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// TODO: Handle changes of StorageLevelif (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// If this is the first time this RDD is marked for persisting, register it// with the SparkContext for cleanups and accounting. Do this only once.if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))sc.persistRDD(this)}storageLevel = newLevelthis
}
在这里,将会在SparkContext中,将该rdd纳入到缓存范围内。
再回到当任务的执行流程还在DAGSchdeuler当中时,当准备向executor具体拆分成task的时候,将会调用getPreferredLocs()方法,来定位做合适的executor位置。
if (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we don't need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil)} else {val blockIds =rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms =>bms.map(bm => TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) = locs
}
在这个方法中,如果上游采用了窄依赖,将会根据rdd及对应分区得到对应的RDDBlockId,从各个BlockManager寻址得到缓存了上游rdd的位置,优先作为目标调度。
当Rdd在executor上具体进行处理的时候,将会调用iterator()方法返回对应分区下该rdd的所有数据,此处在这里调用了其getOrCompute()方法。
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) =>if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}
}
在这里,尝试从BlockManager中获取该RDD分区下的缓存数据,避免再从头将数据计算一遍,如果本地缓存中找不到对应的缓存数据,将会从头开始对rdd进行计算,并缓存,以便下游算子可以在缓存中获取到。如果在上述的调度过程中,将该task直接调度到了上游rdd缓存所在的executor,将可以直接从本地的缓存中读取,完成了rdd处理结果的高效缓存。
spark 上游rdd的缓存相关推荐
- Spark RDD的缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集.当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出的RDD进行的其他动作中重用.这使得 ...
- Spark之RDD理论篇
Spark的基石RDD: RDD与MapReduce Spark的编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是MapReduce的扩展和延申, ...
- Spark:RDD编程总结(概述、算子、分区、共享变量)
目录 1.RDD概述 1.1.RDD是什么 1.2.RDD的弹性 1.3.RDD的特点 1.3.1.分区 1.3.2.只读 1.3.3.依赖 1.3.4.缓存 1.3.5.检查点 2.RDD编程 2. ...
- 什么是RDD?带你快速了解Spark中RDD的概念!
看了前面的几篇Spark博客,相信大家对于Spark的基本概念以及不同模式下的环境部署问题已经搞明白了.但其中,我们曾提到过Spark程序的核心,也就是弹性分布式数据集(RDD).但到底什么是RDD, ...
- Spark的RDD持久化
RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...
- Spark的RDD依赖关系
RDD依赖关系 RDD 血缘关系 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作.将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD 的Lineage 会记录R ...
- Spark之RDD实战篇
Spark RDD创建.转换.行动算子.RDD的持久化: RDD编程 在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换.经过一系列的transformations定义RDD之 ...
- spark输出rdd数据_Spark中RDD的详解
(collect 收集 //将rdd分布式存储在集群中不同分区的数据 获取到一起 组成一个数组返回 //要注意 这个方法将会把所有数据搞到一个机器内 容易造成内存的溢出 在生产环境下千万慎用 rdd. ...
- spark数据处理-RDD
文章目录 spark数据处理笔记 spark核心介绍 RDD编程 RDD介绍-弹性分布式数据集 创建RDD两种方式 函数传递 常见RDD转化操作和行动操作 常用的转化操作: 类集合操作 行动操作 不同 ...
最新文章
- 无法访问linux mysql_远程无法访问linux Mysql解决方案(转)
- python 杂记(二)
- [ JavaScript ] 数据结构与算法 —— 链表
- JAVA怎么实现网页退出系统_java后台实现js关闭本页面,父页面指定跳转或刷新操作...
- 【原创】tarjan算法初步(强连通子图缩点)
- scut协议配置工具初始化的一些问题
- java生成txt_Java ThreadDump 生成解析
- VScode下载安装及使用教程
- 线性代数知识点整理(自用)
- 计算机应用基础周记,2800字计算机应用基础实习报告范文.doc
- html图片轮播效果加链接,HTML首页怎么加图片轮播?
- 系统设计题面试八股文背诵版
- Excel根据手机号区分运营商
- 黑苹果音频卡顿_手机耗电大、卡顿怎么办?只需关掉这个按键轻松解决,去试试...
- C语言rot90的头文件,python – np.rot90()破坏了opencv图像
- 浏览器输入www.baidu.com之后,发生了什么?
- 漫谈深度学习时代点击率预估技术进展
- 4.1 Member 的各种调用方式
- 20万、50万、100万年薪的算法工程师能力上有哪些差距?
- 反激式开关电源芯片是什么?如何对反激开关电源mos管选型?
热门文章
- 删除以x为根节点的子树并释放☆
- Windows下Maven 环境配置
- 诗与远方:无题(八十七)
- django之Layui界面点击弹出个对话框并请求逻辑生成分页的动态表格
- Hbase单点安装Version1.1.5
- Java中常见的排序算法代码演示
- 睡前小故事之Html
- java 中的 Math.round(-1.5) 等于多少?
- java ee junit_JavaEE——Junit
- c++使用单向链表存储一组有序数据_初试攻略丨计算机考研中数据结构知识点总结,硬核!...