spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收

KafkaRDD分区个数的确定和每个分区数据接收的计算
在KafkUtils.createDirectStream创建了DirectDStream,代码如下:
def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] (ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String]): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)val result = for {/** 通过跟Kafka集群通信,获得Kafka某个topic的partition信息,topicPartitions是一个数组,数组大小跟Kafka topic的分区个数相同* 数组元素包含话题名和parition的index* */topicPartitions <- kc.getPartitions(topics).rightleaderOffsets <- (if (reset == Some("smallest")) {kc.getEarliestLeaderOffsets(topicPartitions)} else {kc.getLatestLeaderOffsets(topicPartitions)}).right} yield {//计算Kafka topic的每个partition的offsetval fromOffsets = leaderOffsets.map { case (tp, lo) =>(tp, lo.offset)}new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)}KafkaCluster.checkErrors(result)}

在这里,通过跟Kafka集群通信,获得Kafka topic每个partition的消息偏移量,作为参数继续创建DirectKafkaInputDstream.

DirectKafkaInputDstream的部分代码如下:

class DirectKafkaInputDStream[K: ClassTag,V: ClassTag,U <: Decoder[K]: ClassTag,T <: Decoder[V]: ClassTag,R: ClassTag](@transient ssc_ : StreamingContext,val kafkaParams: Map[String, String],val fromOffsets: Map[TopicAndPartition, Long],messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {val maxRetries = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRetries", 1)// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")private[streaming] override def name: String = s"Kafka direct stream [$id]"protected[streaming] override val checkpointData =new DirectKafkaInputDStreamCheckpointDataprotected val kc = new KafkaCluster(kafkaParams)protected val maxMessagesPerPartition: Option[Long] = {val ratePerSec = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRatePerPartition", 0)if (ratePerSec > 0) {val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000Some((secsPerBatch * ratePerSec).toLong)} else {None}}//将topic的分区个数和偏移量信息保存在currentOffsets中protected var currentOffsets = fromOffsets@tailrecprotected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)// Either.fold would confuse @tailrec, do it manuallyif (o.isLeft) {val err = o.left.get.toStringif (retries <= 0) {throw new SparkException(err)} else {log.error(err)Thread.sleep(kc.config.refreshLeaderBackoffMs)latestLeaderOffsets(retries - 1)}} else {o.right.get}}// limits the maximum number of messages per partition/** 当没有设置最大接收速率的时候,接收终止点是当前时间的每个partition的offset* */protected def clamp(leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {maxMessagesPerPartition.map { mmp =>leaderOffsets.map { case (tp, lo) =>tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))}}.getOrElse(leaderOffsets)}override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {//计算本次数据接收终止的每个paritition的offsetval untilOffsets = clamp(latestLeaderOffsets(maxRetries))val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)// Report the record number of this batch interval to InputInfoTracker.val inputInfo = InputInfo(id, rdd.count)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)Some(rdd)}

结论:spark-streaming DirectDStream数据接受方式,如果没有设置最大接收速率,每个batch的数据接收量为一个batch时间间隔内,Kafka topic接收到的消息量

Kafka的分区信息在DirectKafkaInputDStream的类初始化操作中,通过fromOffsets参数传递给它的currentOffsets成员,这个成员在创建KafkaRDD的时候作为初始化成员将Kafka的分区信息传递给KafkaRDD,作为生成KafkaRDD paritition的依据。

object KafkaRDD {import KafkaCluster.LeaderOffset/*** @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">* configuration parameters</a>.*   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),*   NOT zookeeper servers, specified in host1:port1,host2:port2 form.* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)*  starting point of the batch* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)*  ending point of the batch* @param messageHandler function for translating each message into the desired type*/def apply[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag](sc: SparkContext,kafkaParams: Map[String, String],fromOffsets: Map[TopicAndPartition, Long],untilOffsets: Map[TopicAndPartition, LeaderOffset],messageHandler: MessageAndMetadata[K, V] => R): KafkaRDD[K, V, U, T, R] = {val leaders = untilOffsets.map { case (tp, lo) =>tp -> (lo.host, lo.port)}.toMap//根据Kafka topic的每个partition的起始地址和终止地址计算表示接收数据的数据结构OffsetRangeval offsetRanges = fromOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp)OffsetRange(tp.topic, tp.partition, fo, uo.offset)}.toArraynew KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)}
}
class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag] private[spark] (sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange],leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {//根据OffsetRanges生成RDD的partitionoverride def getPartitions: Array[Partition] = {offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))//host是Kafka broker的ip地址, port是Kafka broker的端口号new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)}.toArray}

在创建RDD的时候,会最终调用到getPartitions方法,这样确定了KafkaRDD每个partition所在的IP地址和端口号,KafkaRDD每个Paritition所在的IP地址为Kafka broker的地址从前面的文章:

spark-streaming系列------- 2. spark-streaming的Job调度 下

知道,DirectKafkaInputDStream.compute方法被Spark-streaming的调度模块周期调用产生DStream的RDD

通过上面的代码分析,知道了Kafka的分区个数和RDD的分区个数相同,并且RDD的一个paritition和Kafka的一个partition一一对应。

KafkaRDD的数据接收

Spark-streaming任务启动之后,调用了SparkContext.runJob将数据接收和处理任务提交到Spark的Task调度系统。Spark的Task调度系统经过一系列的RDD依赖运算之后找到Root RDD是KafkaRDD。然后根据KafkaRDD的partition首先将KafkaRDD的处理任务添加到任务等待HashMap。实现代码在TaskSetManager.addPendingTask方法

private def addPendingTask(index: Int, readding: Boolean = false) {// Utility method that adds `index` to a list only if readding=false or it's not already theredef addTo(list: ArrayBuffer[Int]) {if (!readding || !list.contains(index)) {list += index}}for (loc <- tasks(index).preferredLocations) {//preferredLocation方法返回partition所在的IP地址loc match {case e: ExecutorCacheTaskLocation =>addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))case e: HDFSCacheTaskLocation => {val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) => {for (e <- set) {addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))}logInfo(s"Pending task $index has a cached location at ${e.host} " +", where there are executors " + set.mkString(","))}case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +", but there are no executors alive there.")}}case _ => Unit}addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))//由于DirectDStream方式的loc.host地址不属于Spark集群和HDFS集群,所以Task加到了这个HashMapfor (rack <- sched.getRackForHost(loc.host)) {addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))}}if (tasks(index).preferredLocations == Nil) {addTo(pendingTasksWithNoPrefs)}if (!readding) {allPendingTasks += index  // No point scanning this whole list to find the old task there  所有的Task都会加入到这个HashMap,包括DirectDStream情况下的Task}}

在这个方法里面,KafkaRDD的处理Task加入到了pendingTasksForHost和allPendingTasks两个Task等待HashMap中

任务加入到等待HashMap之后,会发送ReviveOffers消息,调用CoarseGrainedScheduleBackend.makeOffers方法确定Task在那些Executor执行,并且启动Task

CoarseGrainedScheduleBackend.makeOffers方法最终调用到TaskSchedulerImpl.resourceOfferSingleTaskSet为一个TaskSet分配资源

 //每次调用这个方法,会为轮询每个Executor分配一个Task。当TaskSet的task个数比executor的个数多的时候,剩余的Task这次调用就不执行。//当一个Executor上的task执行完毕之后,会发送StatusUpdate事件,driver会重新调用到这个方法,继续从TaskSet中取出Task让这个Executor执行private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = falsefor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostif (availableCpus(i) >= CPUS_PER_TASK) {//按照cpu cores个数分配tasktry {for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += task //将这个task放在了第i个worker(worker顺序已经shuffle了)val tid = task.taskIdtaskIdToTaskSetId(tid) = taskSet.taskSet.id//记录task所在的tasksettaskIdToExecutorId(tid) = execId//记录task所在的executorexecutorsByHost(host) += execIdavailableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}} catch {case e: TaskNotSerializableException =>logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")// Do not offer resources for this task, but don't throw an error to allow other// task sets to be submitted.return launchedTask}}}return launchedTask}

在上面的resourceOfferSingleTaskSet方法中,将产生的Task轮询分配到了各个Executor

下面看看Task是如何产生的:

TaskSetManager.resourceOffer定义:

def resourceOffer(execId: String,host: String,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={if (!isZombie) {val curTime = clock.getTimeMillis()var allowedLocality = maxLocalityif (maxLocality != TaskLocality.NO_PREF) {allowedLocality = getAllowedLocalityLevel(curTime)if (allowedLocality > maxLocality) {// We're not allowed to search for farther-away tasksallowedLocality = maxLocality}}dequeueTask(execId, host, allowedLocality) match {case Some((index, taskLocality, speculative)) => {// Found a task; do some bookkeeping and return a task descriptionval task = tasks(index)val taskId = sched.newTaskId()// Do various bookkeepingcopiesRunning(index) += 1val attemptNum = taskAttempts(index).sizeval info = new TaskInfo(taskId, index, attemptNum, curTime,execId, host, taskLocality, speculative)taskInfos(taskId) = infotaskAttempts(index) = info :: taskAttempts(index)// Update our locality level for delay scheduling// NO_PREF will not affect the variables related to delay schedulingif (maxLocality != TaskLocality.NO_PREF) {currentLocalityIndex = getLocalityIndex(taskLocality)lastLaunchTime = curTime}// Serialize and return the taskval startTime = clock.getTimeMillis()val serializedTask: ByteBuffer = try {Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)} catch {// If the task cannot be serialized, then there's no point to re-attempt the task,// as it will always fail. So just abort the whole task-set.case NonFatal(e) =>val msg = s"Failed to serialize task $taskId, not attempting to retry it."logError(msg, e)abort(s"$msg Exception during serialization: $e")throw new TaskNotSerializableException(e)}if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&!emittedTaskSizeWarning) {emittedTaskSizeWarning = truelogWarning(s"Stage ${task.stageId} contains a task of very large size " +s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")}addRunningTask(taskId)// We used to log the time it takes to serialize the task, but task size is already// a good proxy to task serialization time.// val timeTaken = clock.getTime() - startTimeval taskName = s"task ${info.id} in stage ${taskSet.id}"logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(taskName, taskId, host, taskLocality, serializedTask.limit))sched.dagScheduler.taskStarted(task, info)return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,taskName, index, serializedTask))}case _ =>}}None}

从上面的方法可知道,Task的获取是在TaskSetManager.dequeueTask方法,定义如下:

 //优先返回本地性最高的taskprivate def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={//如果这个Executor有等待任务,则从等待队列取下来,返回for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {//由于KafkaRDD partition所在的Ip地址跟Executor的IP地址不同,所以Task不能从这个HashMap获取for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {//KafkaRDD的处理Task从addPendingTasks这个HashMap获取for (index <- dequeueTaskFromList(execId, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}

在产生任务的时候,尽量优先产生本地性高的任务,由于KafkaRDD各个Partition所在的IP地址跟Spark Executor的IP地址不同,只能从allPendingTask这个HashMap获取任务了。

根据上面3个方法的分析得出结论:KafkaRDD的接收Task个数跟KafkaRDD的partition个数是相同的,并且所有的KafkaRDD处理Task轮询分配到了各个Executor上

KafkaRDD的实际开始处理是在ShuffleMapTask.runTask方法,源码如下:

override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.val deserializeStartTime = System.currentTimeMillis()val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimemetrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])//rdd.iterator读取并处理数据,把处理结果返回return writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}

这个方法根据RDD的依赖关系,调用到了KafkaRDD.compute方法,由于KafkaRDD是root RDD,所以KafkaRDD.compute在一系列依赖RDD中最先执行,返回从Kafka broker接收到的消息的Iterator ,而Spark在处理RDD partition的时候,RDD paritition中的数据最原始的组织形式就是Iterator

结论:Spark-streaming 采用DirectDStream接收数据,把接收过来的数据直接组织成RDD进行处理

spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收相关推荐

  1. 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

    特别说明:  在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是:  Spark Streaming在接收 ...

  2. Spark Streaming系列-5、应用案例: 百度搜索风云榜

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...

  3. Spark Streaming整合logstash + Kafka wordCount

    1.安装logstash,直接解压即可 测试logstash是否可以正常运行 bin/logstash -e 'input { stdin { } } output { stdout {codec = ...

  4. 【备忘】最新spark/hadoop/hbase/hive/kafka/redies大数据视频教程

    day01 软件安装.Linux相关.shell     day02 自动化部署高级文本命令     day03 集群部署zookeeper     day04 并发动态大数据机制.Java反射.动态 ...

  5. 使用Spark Streaming SQL基于时间窗口进行数据统计

    1.背景介绍 流式计算一个很常见的场景是基于事件时间进行处理,常用于检测.监控.根据时间进行统计等系统中.比如埋点日志中每条日志记录了埋点处操作的时间,或者业务系统中记录了用户操作时间,用于统计各种操 ...

  6. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

  7. .Spark Streaming(上)--实时流计算Spark Streaming原理介

    Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/474 ...

  8. sparksteaming---实时流计算Spark Streaming原理介绍

    来源:http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spa ...

  9. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

最新文章

  1. let const —— ES6基础总结(二)
  2. PHP将多个文件中的内容合并为新的文件
  3. 重要的,是那些训练中被多次遗忘的样本
  4. 用并发队列(ConcurrentQueue)实现多线程任务中随时异步回调进度通知(.Net4.0)
  5. oracle ogg下载安装,牛刀小试Oracle GoldenGate--OGG安装(一)
  6. ecs云服务器搭建php,云服务器 ecs怎么配置php
  7. python开发面试笔试题_Python测试开发面试笔试题
  8. PS滤镜学习视频教程
  9. Python脚本文件的创建运行
  10. STM32入门学习经验总结
  11. Java实现港(澳)台大陆身份证校验(亲测有效)
  12. JAVA北京时间转换为世界协调时
  13. 我们为什么做不好软件项目?做项目时间都浪费在反复奸奸杀杀,杀杀奸奸上了
  14. 手机上可以拍蓝底证件照吗
  15. 选股小技巧|如何选股
  16. 运维知识讲解之电脑局域网服务器的密码设置和修改
  17. java读书雷_Java多态学习笔记
  18. 新媒体跨考计算机,跨专业考研求助
  19. java模拟手机浏览web_在PC上测试移动端网站和模拟手机浏览器的5大方法
  20. 运行tensorflow-datasets遇到import tensorflow.compat.v2 as tf报错ImportError: No module named tensorflow.V2

热门文章

  1. 如何开始学习 Hadoop?
  2. 动态3D地图UpNext挑战Google Maps 或被苹果收购
  3. Microsoft Visio Premium 2010安装过程中遇到错误1402所需修改的注册表项
  4. express+node+mysql简单博客系统(一):登录接口
  5. 连接 Mysql 报错 1129 Host ‘xxx.xxx.xxx.xxx‘ is blocked because of many connection errors;
  6. 素人与欧阳娜娜的vlog之间,相隔的不只是明星光环
  7. LeetCode_189.轮转数组
  8. 24 款必备的 Linux 桌面应用(2016 版)
  9. 英雄联盟怎么解除小窗口_英雄联盟手游卡莎怎么出装-英雄联盟手游卡莎出装推荐...
  10. kk 凯文·凯利 《失控》读后