DelayedProduce是DelayedOperation在生产者向服务器提交消息的时候的一个延迟操作的实现

一 核心字段

delayMs: Long 延迟的时间

produceMetadata:ProduceMetadata 为一个ProduceRequest中所有相关分区记录一些追加消息后返回结果,主要用于判断DelayedProduces是否满足执行条件

replicaManager:ReplicaManager

responseCallback: 任务满足条件或者到期执行时,在DelayedProduce.conComplete()方法中调用的回调函数,主要向RequestChannel中对应的responseQueue添加ProducerResponse

二 核心方法

# tryComplete方法,主要就是检测是否满足DepayedProduce的执行条件,并在满足知心条件的时候调用forceComplete方法完成该延迟操作。满足下列条件之一,即表示此分区已经满足DelayedProduce的执行条件,只有ProduceRequest中涉及的所有分区都满足条件,DelayedProduce才能最终执行

# 该分区出现leader副本迁移。该分区leader副本不再位于此节点,此时会更新对应ProducePartitionStatus中错误码

# 正常情况下,ISR集合所有副本都完成同步以后,该分区的leader副本HighWatermark位置已经大于对应的ProduceStatus.requiredOffset

此时会清空所有设置的超时错误码

# 如果出现异常,则更新分区对应的ProducePartitionStatus中记录的错误码

override def tryComplete(): Boolean = {
  // 遍历ProduceMetaData所有的分区状态
 
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
    trace(s"Checking produce satisfaction for ${topicAndPartition}, currentstatus $status")
    // 检查此分区是否已经满足DeplayedProduce执行条件
   
if (status.acksPending) {// 获取对应的Partition对象
     
val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
        case Some(partition) =>
          // 检查此分区HW位置是否大于requiredOffset
         
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
        case None =>
          // 找不到leader
         
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
      }
      // 出现异常
     
if (error != Errors.NONE || hasEnough) {
        status.acksPending = false
        status.responseStatus.errorCode = error.code
      }
    }
  }

// 检查全都分区是否都已经符合DeplayedProduce的执行条件
 
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
    forceComplete()
  else
    false
}

#onComplete:体的业务逻辑,向客户端返回ProduceResponse,该方法只能被forceComplete方法中被调用,且在DelayedOperation生命周期中只能被调用一次
override def onComplete() {val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)responseCallback(responseStatus)
}

另外我们已经知道responseCallback回调它会向RequestChannel对应responseQueue队列添加ProduceResponse,最终Processor线程会将ProducerResponse返回给生产者,该回调函数是在KafkaApis#handle

ProducerRequest方法中定义的sendResponseCallback函数

def handleProducerRequest(request: RequestChannel.Request) {// 将请求转换成ProduceRequestval produceRequest = request.body.asInstanceOf[ProduceRequest]// 获取请求字节数val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf/** 过配置的authorize的实例,执行对Write操作对应此Topic的认证,* 通过把认证成功的消息与认证不成功的消息分别存储到两个不同的集合中* 如果authorize函数返回的值是true表示认证成功,放到authorizedRequestInfo集合中.* 如果authorize函数返回的值是false表示认证失败,放到unauthorizedRequestInfo集合中.*/val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)}val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))}/** 这是发给Produce客户端的一个响应回调函数* 如果写请求未授权,那么返回未授权错误* 如果topic不存在或者没有授权,返回未知topic错误*/def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {// 生成响应集合,其中包括通过授权并处理完成的以及未通过授权的状态val mergedResponseStatus = responseStatus ++ unauthorizedForWriteRequestInfo.mapValues(_ =>new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))var errorInResponse = false // 标记处理ProducerRequest的过程是否出现异常mergedResponseStatus.foreach { case (topicPartition, status) =>if (status.errorCode != Errors.NONE.code) {errorInResponse = truedebug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(request.header.correlationId,request.header.clientId,topicPartition,Errors.forCode(status.errorCode).exceptionName))}}// 定义处理响应回调函数def produceResponseCallback(delayTimeMs: Int) {// 生产者不需要等待来自broker的消息确认而接着发送下一条,传输消息最高,但是数据可靠性最低if (produceRequest.acks == 0) {// 如果request.required.acks = 0,那么我们这里没必要干什么,因为客户端不会等服务端确认,就继续发送下一条消息// 但是如果有错误,就会关闭socket server,然后客户端就知道发生了一些错误,将会刷新元数据if (errorInResponse) {val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>topicPartition -> Errors.forCode(status.errorCode).exceptionName}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +s"from client id ${request.header.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")// 如果有错误,关闭连接requestChannel.closeConnection(request.processor, request)} else {//否则什么也不干requestChannel.noOperation(request.processor, request)}} else {//  如果其他情况 ack = 1或者-1// 创建一个响应头val respHeader = new ResponseHeader(request.header.correlationId)// 创建响应bodyval respBody = request.header.apiVersion match {case 0 => new ProduceResponse(mergedResponseStatus.asJava)case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)// This case shouldn't happen unless a new version of ProducerRequest is added without// updating this part of the code to handle it properly.case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")}// 将响应发给RequestChannelrequestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))}}// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeMs = SystemTime.millisecondsquotas.produce.recordAndMaybeThrottle(request.session.sanitizedUser,request.header.clientId,numBytesAppended,produceResponseCallback)}

DelayedProduce分析相关推荐

  1. ReplicaManager分析

    一个broker可能分布多个Partition的副本信息,ReplicaManager主要负责管理一个broker范围内的Partition信息,然后它还根据Kafka Controller发送过来的 ...

  2. kafka源码愫读(5)、ReplicaManager模块源码分析

    1.ReplicaManager模块简介 replicaManager主要用来管理topic在本broker上的副本信息.并且读写日志的请求都是通过replicaManager进行处理的. 每个rep ...

  3. kafka源码分析之副本管理-ReplicaManager

    原文地址:https://blog.csdn.net/u014393917/article/details/52043040 ReplicaManager 说明,此组件用于管理kafka中各parti ...

  4. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  5. 2022-2028年中国自动驾驶系统行业现状调研分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国自动驾驶系统行业市场行业相关概述.中国自 ...

  6. 2022-2028年中国阻尼涂料市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻尼涂料行业市场行业相关概述.中国阻尼涂 ...

  7. 2021-2028年中国阻燃装饰行业市场需求与投资规划分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了中国阻燃装饰行业市场行业相关概述.中国阻燃装 ...

  8. 2022-2028年全球与中国漂白吸水棉市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国漂白吸水棉行业市场行业相关概述.全 ...

  9. 2022-2028年全球与中国青苔清洗剂市场研究及前瞻分析报告

    [报告类型]产业研究 [报告价格]4500起 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了全球与中国青苔清洗剂行业市场行业相关概述.全 ...

最新文章

  1. wgs84坐标格式转换度分秒_一起爬山吗?寻找GIS坐标系统中“隐秘的角落”
  2. Docker 调试技巧
  3. python数据分析笔记——数据加载与整理
  4. 自动化测试用例设计原则
  5. Spring Data JPA教程
  6. python methodtype_Python的实例定属性和方法或类绑定方法
  7. 介绍MFSideMenu左右滑动控件的使用
  8. 【机器学习技术】高斯过程初探
  9. FFmpeg 软编码H.264与H.265
  10. R语言 如何生成彩色柱状图
  11. 梦幻西游脚本开发教学
  12. 方正璞华研发的社保股权管理系统初见成效
  13. 1160. 不容易系列之二
  14. 2021年安全生产模拟考试(建筑起重机司机-施工电梯升降机司机模拟考试题库)安考星
  15. 计算机网络调试记录表,计算机网络管理员中级操作技能考核评分记录表.doc
  16. B站自定义视频播放速度
  17. 自动驾驶专题介绍 ———— 转向系统
  18. 大数据专业毕业后前景如何?能做什么职位?
  19. Centos 7创建软连接,硬连接的方法
  20. 每月分享之兰迪·波许教授的最后一课

热门文章

  1. java转换字符集_Java字符集转换解释
  2. html链接sqlserver,js连接sqlserver进行查询
  3. java http请求 工具类_java模拟http请求调用远程接口工具类
  4. java语音jvm_java环境中基于jvm的两大语言:scala,groovy
  5. php类的实例化方法,php中类的定义和实例化方法
  6. SpringBoot 2.x 集成Redis
  7. Java 蓝桥杯 矩阵乘法
  8. html表单的首要标记是form,关于html中表单form标记的介绍
  9. 盘启动盘_小白教你ULTRAISO制作U盘启动盘
  10. 解决pandas读取parquet报错ImportError:Unable to find a usable engine;tried using: ‘pyarrow‘, ‘fastparquet‘