kafka 心跳和 reblance
kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳。
consumer 和 reblance 相关的 2 个配置参数:
参数名 --> MemberMetadata 字段 session.timeout.ms --> MemberMetadata.sessionTimeoutMs max.poll.interval.ms --> MemberMetadata.rebalanceTimeoutMs
broker 端,sessionTimeoutMs 参数
broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 reblance。
1 private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { 2 // complete current heartbeat expectation 3 member.latestHeartbeat = time.milliseconds() 4 val memberKey = MemberKey(member.groupId, member.memberId) 5 heartbeatPurgatory.checkAndComplete(memberKey) 6 7 // reschedule the next heartbeat expiration deadline 8 // 计算心跳截止时刻 9 val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs 10 val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) 11 heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) 12 } 13 14 // 心跳过期 15 def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { 16 group.inLock { 17 if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { 18 info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") 19 removeMemberAndUpdateGroup(group, member) 20 } 21 } 22 } 23 24 private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = 25 member.awaitingJoinCallback != null || 26 member.awaitingSyncCallback != null || 27 member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数
如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 reblance
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 代码片段:
if (coordinatorUnknown()) {if (findCoordinatorFuture != null || lookupCoordinator().failed())// the immediate future check ensures that we backoff properly in the case that no// brokers are available to connect to.AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) {// the session timeout has expired without seeing a successful heartbeat, so we should// probably make sure the coordinator is still healthy. markCoordinatorUnknown(); } else if (heartbeat.pollTimeoutExpired(now)) {// the poll timeout has expired, which means that the foreground thread has stalled// in between calls to poll(), so we explicitly leave the group. maybeLeaveGroup(); } else if (!heartbeat.shouldHeartbeat(now)) {// poll again after waiting for the retry backoff in case the heartbeat failed or the// coordinator disconnectedAbstractCoordinator.this.wait(retryBackoffMs); } else {heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat(time.milliseconds());}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {if (e instanceof RebalanceInProgressException) {// it is valid to continue heartbeating while the group is rebalancing. This// ensures that the coordinator keeps the member in the group for as long// as the duration of the rebalance timeout. If we stop sending heartbeats,// however, then the session timeout may expire before we can rejoin. heartbeat.receiveHeartbeat(time.milliseconds());} else {heartbeat.failHeartbeat();// wake up the thread if it's sleeping to reschedule the heartbeatAbstractCoordinator.this.notify();}}}}); }
org.apache.kafka.clients.consumer.internals.Heartbeat#pollTimeoutExpired:
//maxPollInterval 即 rebalanceTimeoutMs public boolean pollTimeoutExpired(long now) {return now - lastPoll > maxPollInterval; }
join group 的处理逻辑:kafka.coordinator.group.GroupCoordinator#onCompleteJoin
转载于:https://www.cnblogs.com/allenwas3/p/10278998.html
kafka 心跳和 reblance相关推荐
- Kafka 心跳机制 重复消费
kafka 心跳机制 Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了.心跳超时会导致消息重复消费. 在 ...
- 6张图阐述Kafka心跳机制(时间轮算法的具体运用)
Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...
- kafka 心跳参数
概念 consumer 向 broker 发送心跳,表明自己还活着,不用被踢出消费者组 参数 session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间.例如 ...
- Kafka consumer频繁reblance
转载 李亚飞 大佬的文章:https://www.lyafei.com/ 其实文章名可以叫 记一次线上 Kafka 问题排查,但觉得稀松平常,弄些术词显得硬核点,hhh,言归正传,线上一个 Go 服务 ...
- 你了解kafka的Reblance机制吗?
在面试的过程中是不是经常被面试官问到类似的问题?读完这篇文章后,相信你能给出一个让面试官满意的答案. Reblance是什么 Reblance就像他的名称一样,意思是再平衡,平衡什么?平衡消费者和分区 ...
- Kafka知识总结及面试题
文章目录 概念 Kafka基础概念 命令行 Kafka 数据存储设计 kafka在zookeeper中存储结构 生产者 生产者设计 消费者 消费者设计 面试题 kafka设计 请说明什么是Apache ...
- 《Kafka权威指南》记录
生产者 生产流程 32页 生产者创建 Kafka生产者需要三个必须参数:broker地址清单,key和value的序列化方式 (如StringSerializer) 生产者发送 ACKS acks 参 ...
- 中国民生银行天眼日志平台架构演进的平凡之路
本文由 [AI前线]原创,原文链接:t.cn/RYgJ8hD AI 前线导读: "随着中国民生银行的 IT 业务系统的迅速发展,主机.设备.系统.应用软件数量不断增多,业务资源访问.操作量不 ...
- 《中国民生银行天眼日志平台架构演进的平凡之路》阅读有感
<中国民生银行天眼日志平台架构演进的平凡之路>阅读有感 随着中国民生银行的 IT 业务系统的迅速发展,主机.设备.系统.应用软件数量不断增多,业务资源访问.操作量不断增加,对于应用整体系统 ...
最新文章
- 很安逸的离线API文档查询工具Dash和Zeal
- Spark SQL 源代码分析系列
- (转)Linux下MatlabCompilerRuntime的安装和使用
- jsp页面其本质就是一个servlet
- ios关于相机访问权限设置
- 2017/National _Java_C/2/数字划分
- 你的灯亮着吗?阅读笔记之一
- asp.net 导出Excel 设置格式
- Python sys模块的使用
- 三家快递公司涨派费:9月1日起每票上调0.1元
- java第一阶段面面试题_java基础阶段几个必会面试题
- 一步一步写算法(之查找)
- c语言源程序最多可能由组成,一个c语言源程序是由什么组成_后端开发
- MATLAB 2017b 安装教程 (推荐)
- C 语言中结构体中成员所占内存的大小
- 全国哀悼日 一段css让全站变灰
- 超分辨率重建 matlab,图像超分辨率重建软件
- 地震产生的原因和征兆
- 计算机培训考试内容,计算机等级考试的科目和内容解析
- 不错的讲解业务架构,应用架构,数据架构的图
热门文章
- Python高级特性与网络爬虫(一):使用Ajax请求爬取用户微博内容和python多进程爬取用户图片
- 通过短信猫发送手机短信
- 【C++要笑着学】虚函数表(VBTL) | 观察虚表指针 | 运行时决议与编译时决议 | 动态绑定与静态绑定 | 静态多态与动态多态 | 单继承与多继承关系的虚表
- 通过 Colab 下载 Google Driver 上的大文件到内网服务器
- 基于Java的物流公司管理系统项目记录
- EMAS移动DevOps解决方案-Mobile DevOps
- 梦想是什么,梦想在哪里,IT梦!
- 微信小程序----团购或秒杀的批量倒计时实现
- 手机手电筒功能的实现
- 牛逼!阿里程序员双十一神器!