kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳。

consumer 和 reblance 相关的 2 个配置参数:

参数名                --> MemberMetadata 字段
session.timeout.ms   --> MemberMetadata.sessionTimeoutMs
max.poll.interval.ms --> MemberMetadata.rebalanceTimeoutMs

broker 端,sessionTimeoutMs 参数

broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 reblance。

 1   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
 2     // complete current heartbeat expectation
 3     member.latestHeartbeat = time.milliseconds()
 4     val memberKey = MemberKey(member.groupId, member.memberId)
 5     heartbeatPurgatory.checkAndComplete(memberKey)
 6
 7     // reschedule the next heartbeat expiration deadline
 8     // 计算心跳截止时刻
 9     val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
10     val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
11     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
12   }
13
14   // 心跳过期
15   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
16     group.inLock {
17       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
18         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
19         removeMemberAndUpdateGroup(group, member)
20       }
21     }
22   }
23
24   private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
25     member.awaitingJoinCallback != null ||
26       member.awaitingSyncCallback != null ||
27       member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline

consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数

如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 reblance

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 代码片段:

if (coordinatorUnknown()) {if (findCoordinatorFuture != null || lookupCoordinator().failed())// the immediate future check ensures that we backoff properly in the case that no// brokers are available to connect to.AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {// the session timeout has expired without seeing a successful heartbeat, so we should// probably make sure the coordinator is still healthy.
    markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {// the poll timeout has expired, which means that the foreground thread has stalled// in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {// poll again after waiting for the retry backoff in case the heartbeat failed or the// coordinator disconnectedAbstractCoordinator.this.wait(retryBackoffMs);
} else {heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat(time.milliseconds());}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {if (e instanceof RebalanceInProgressException) {// it is valid to continue heartbeating while the group is rebalancing. This// ensures that the coordinator keeps the member in the group for as long// as the duration of the rebalance timeout. If we stop sending heartbeats,// however, then the session timeout may expire before we can rejoin.
                    heartbeat.receiveHeartbeat(time.milliseconds());} else {heartbeat.failHeartbeat();// wake up the thread if it's sleeping to reschedule the heartbeatAbstractCoordinator.this.notify();}}}});
}

org.apache.kafka.clients.consumer.internals.Heartbeat#pollTimeoutExpired:

//maxPollInterval 即 rebalanceTimeoutMs
public boolean pollTimeoutExpired(long now) {return now - lastPoll > maxPollInterval;
}

join group 的处理逻辑:kafka.coordinator.group.GroupCoordinator#onCompleteJoin

转载于:https://www.cnblogs.com/allenwas3/p/10278998.html

kafka 心跳和 reblance相关推荐

  1. Kafka 心跳机制 重复消费

    kafka 心跳机制 Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了.心跳超时会导致消息重复消费. 在 ...

  2. 6张图阐述Kafka心跳机制(时间轮算法的具体运用)

    Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...

  3. kafka 心跳参数

    概念 consumer 向 broker 发送心跳,表明自己还活着,不用被踢出消费者组 参数 session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间.例如 ...

  4. Kafka consumer频繁reblance

    转载 李亚飞 大佬的文章:https://www.lyafei.com/ 其实文章名可以叫 记一次线上 Kafka 问题排查,但觉得稀松平常,弄些术词显得硬核点,hhh,言归正传,线上一个 Go 服务 ...

  5. 你了解kafka的Reblance机制吗?

    在面试的过程中是不是经常被面试官问到类似的问题?读完这篇文章后,相信你能给出一个让面试官满意的答案. Reblance是什么 Reblance就像他的名称一样,意思是再平衡,平衡什么?平衡消费者和分区 ...

  6. Kafka知识总结及面试题

    文章目录 概念 Kafka基础概念 命令行 Kafka 数据存储设计 kafka在zookeeper中存储结构 生产者 生产者设计 消费者 消费者设计 面试题 kafka设计 请说明什么是Apache ...

  7. 《Kafka权威指南》记录

    生产者 生产流程 32页 生产者创建 Kafka生产者需要三个必须参数:broker地址清单,key和value的序列化方式 (如StringSerializer) 生产者发送 ACKS acks 参 ...

  8. 中国民生银行天眼日志平台架构演进的平凡之路

    本文由 [AI前线]原创,原文链接:t.cn/RYgJ8hD AI 前线导读: "随着中国民生银行的 IT 业务系统的迅速发展,主机.设备.系统.应用软件数量不断增多,业务资源访问.操作量不 ...

  9. 《中国民生银行天眼日志平台架构演进的平凡之路》阅读有感

    <中国民生银行天眼日志平台架构演进的平凡之路>阅读有感 随着中国民生银行的 IT 业务系统的迅速发展,主机.设备.系统.应用软件数量不断增多,业务资源访问.操作量不断增加,对于应用整体系统 ...

最新文章

  1. 很安逸的离线API文档查询工具Dash和Zeal
  2. Spark SQL 源代码分析系列
  3. (转)Linux下MatlabCompilerRuntime的安装和使用
  4. jsp页面其本质就是一个servlet
  5. ios关于相机访问权限设置
  6. 2017/National _Java_C/2/数字划分
  7. 你的灯亮着吗?阅读笔记之一
  8. asp.net 导出Excel 设置格式
  9. Python sys模块的使用
  10. 三家快递公司涨派费:9月1日起每票上调0.1元
  11. java第一阶段面面试题_java基础阶段几个必会面试题
  12. 一步一步写算法(之查找)
  13. c语言源程序最多可能由组成,一个c语言源程序是由什么组成_后端开发
  14. MATLAB 2017b 安装教程 (推荐)
  15. C 语言中结构体中成员所占内存的大小
  16. 全国哀悼日 一段css让全站变灰
  17. 超分辨率重建 matlab,图像超分辨率重建软件
  18. 地震产生的原因和征兆
  19. 计算机培训考试内容,计算机等级考试的科目和内容解析
  20. 不错的讲解业务架构,应用架构,数据架构的图

热门文章

  1. Python高级特性与网络爬虫(一):使用Ajax请求爬取用户微博内容和python多进程爬取用户图片
  2. 通过短信猫发送手机短信
  3. 【C++要笑着学】虚函数表(VBTL) | 观察虚表指针 | 运行时决议与编译时决议 | 动态绑定与静态绑定 | 静态多态与动态多态 | 单继承与多继承关系的虚表
  4. 通过 Colab 下载 Google Driver 上的大文件到内网服务器
  5. 基于Java的物流公司管理系统项目记录
  6. EMAS移动DevOps解决方案-Mobile DevOps
  7. 梦想是什么,梦想在哪里,IT梦!
  8. 微信小程序----团购或秒杀的批量倒计时实现
  9. 手机手电筒功能的实现
  10. 牛逼!阿里程序员双十一神器!