十一、kafka消费者之joinGroup
这一节我们主要来分析joinGroup这块的代码,主要流程如图一。
流程展示
拆解JoinGroup协议
客户端的代码我们就不拿出来说了,等后面说到关键点的时候再拿出来一起分析,这里目前只需要知道会发JoinGroupRequest请求给服务端,请求及返回数据样例如下,协议的结构见图二及图三。
JoinGroupRequestData(groupId=‘mykafka-group’, sessionTimeoutMs=10000, rebalanceTimeoutMs=300000,
memberId=‘consumer-mykafka-group-1-e767a2e9-ac9d-4e61-af95-e50894101de9’, groupInstanceId=null, protocolType=‘consumer’, protocols=[JoinGroupRequestProtocol(name=‘cooperative-sticky’, metadata=[0, 1, 0, 0, 0, 1, 0, 6, 116, 101, 115, 116, 95, 50, -1, -1, -1, -1, 0, 0, 0, 0])])
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, memberId=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, groupInstanceId=‘mykafka-group_4_1’, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0])])
服务端处理
服务端对JoinGroupRequest请求会在kafka.server.KafkaApis#handleJoinGroupRequest代码中做处理。从图一的流程图中我们可以知道是否存在memberId及groupInstanceId会走不同的代码分支,下面我们以下几个方面来剖析源码。
如果JoinGroupRequest请求中memberId为空,groupInstanceId为空的情况
这里对应流程图中的分支见图四
首先在入口处会有个是否需要memberId的判断,代码如下,这是为了兼容低版本的消费端。在kafka2.5中,joinGroupRequest.version是7,经过比对各个版本的源码,kafka2.3以上joinGroupRequest.version会大于等于4,即在groupInstanceId为空的情况下是需要memberId的。
val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
接着会走kafka.coordinator.group.GroupCoordinator#doUnknownJoinGroup,代码如下,由于此时memberId及groupInstanceId均为空,会走(requireKnownMemberId) 为true的逻辑,大家可以看到如果需要memberId的话这里会生成memberId然后返回response,可以再往深一点想,即在consume初始消费的时候,在没有groupInstanceId的情况下,会发送两次请求,第一次请求memberId为空,服务端会返回对应的memberId,第二次请求会带上memberId进行真正的joinGroup操作。
private def doUnknownJoinGroup(group: GroupMetadata,groupInstanceId: Option[String],requireKnownMemberId: Boolean,clientId: String,clientHost: String,rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {group.inLock {//3.1.1 生成memberId优先使用groupInstanceId,没有则用clientIdval newMemberId = group.generateMemberId(clientId, groupInstanceId)//这里首先判断缓存中有没有groupInstanceId,如果没有的话又需要生成memberId,则生成memberId后返回,如果存在groupInstanceId,则在生成memberId// 后直接调用addMemberAndRebalanceif (group.hasStaticMember(groupInstanceId)) {updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)} else if (requireKnownMemberId) {//3.1.2 如果需要memberId的话会直接返回responsedebug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")group.addPendingMember(newMemberId)addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))} else {info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,clientId, clientHost, protocolType, protocols, group, responseCallback)}}}
kafka.coordinator.group.GroupMetadata#generateMemberId
生成memberId的代码很简单,就是clientId或者groupInstanceId加uuid
def generateMemberId(clientId: String,groupInstanceId: Option[String]): String = {var memberId="";groupInstanceId match {case None =>memberId = clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toStringcase Some(instanceId) =>memberId = instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString}memberId}
如果JoinGroupRequest请求中memberId不为空,groupInstanceId为空的情况
这里对应流程图中的分支见图五
在这种情况下是走的kafka.coordinator.group.GroupCoordinator#doJoinGroup方法。看着方法比较长,实际就分两块,经过各种校验之后,要不走addMemberAndRebalance,要不就走updateMemberAndRebalance。
private def doJoinGroup(group: GroupMetadata,memberId: String,groupInstanceId: Option[String],clientId: String,clientHost: String,rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {group.inLock {if (group.is(Dead)) {responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))} else if (group.isPendingMember(memberId)) {// A rejoining pending member will be accepted. Note that pending member will never be a static member.if (groupInstanceId.isDefined) {throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +s"into pending member bucket with member id $memberId")} else {debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +s"${group.currentState} state. Adding to the group now.")addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,clientId, clientHost, protocolType, protocols, group, responseCallback)}} else {val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))} else if (!group.has(memberId) || groupInstanceIdNotFound) {responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))} else {val member = group.get(memberId)group.currentState match {case PreparingRebalance =>updateMemberAndRebalance(group, member, protocols, responseCallback)case CompletingRebalance =>if (member.matches(protocols)) {// member is joining with the same metadata (which could be because it failed to// receive the initial JoinGroup response), so just return current group information// for the current generation.responseCallback(JoinGroupResult(members = if (group.isLeader(memberId)) {group.currentMemberMetadata} else {List.empty},memberId = memberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = group.leaderOrNull,error = Errors.NONE))} else {// member has changed metadata, so force a rebalanceupdateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>val member = group.get(memberId)if (group.isLeader(memberId) || !member.matches(protocols)) {// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.// The latter allows the leader to trigger rebalances for changes affecting assignment// which do not affect the member metadata (such as topic metadata changes for the consumer)updateMemberAndRebalance(group, member, protocols, responseCallback)} else {// for followers with no actual change to their metadata, just return group information// for the current generation which will allow them to issue SyncGroupresponseCallback(JoinGroupResult(members = List.empty,memberId = memberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = group.leaderOrNull,error = Errors.NONE))}case Empty | Dead =>// Group reaches unexpected state. Let the joining member reset their generation and rejoin.warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +s"unexpected group state ${group.currentState}")responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))}}}}}
到底什么时候会走addMemberAndRebalance,什么时候会走updateMemberAndRebalance呢
1、addMemberAndRebalance:
- 在pendingMembers中,通过前面的代码分析可知在没有memberId且没有groupInstanceId的情况下,会生成memberId并返回,在那个时候就会将生成的memberId
放入pendingMembers中,表示待加入的member。也就是说在这种情况下,客户端第二次带上memberId请求服务端的时候会走走addMemberAndRebalance的逻辑。 - 或者不需要memberId(为了兼容老版本的客户端)
2、updateMemberAndRebalance
- 在group状态为PreparingRebalance时,即触发了joinGroup,在等待其他组成员加入的状态,这里会直接调用updateMemberAndRebalance
- 在group状态为CompletingRebalance时,即组成员已全部加入,并选举出consumeLeader
,等待同步分配方案的状态,这时会匹配传入的协议信息,如果不匹配则调用updateMemberAndRebalance,会再次触发rebalance - 在group状态为Stable时,即完成了重分配,处于稳定态,如果是consumeleader或者不匹配协议信息,则调用updateMemberAndRebalance
kafka.coordinator.group.GroupCoordinator#addMemberAndRebalance
这里主要是多一步,会选举消费端leader,代码也很简单,校验如果leaderId为空则取当前的member为leader,然后调用maybePrepareRebalance
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,memberId: String,groupInstanceId: Option[String],clientId: String,clientHost: String,protocolType: String,protocols: List[(String, Array[Byte])],group: GroupMetadata,callback: JoinCallback): Unit = {val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,clientId, clientHost, rebalanceTimeoutMs,sessionTimeoutMs, protocolType, protocols)member.isNew = trueinfo(s"group.generationId:${group.generationId}")// update the newMemberAdded flag to indicate that the join group can be further delayedif (group.is(PreparingRebalance) && group.generationId == 0)group.newMemberAdded = true//会选举leadergroup.add(member, callback)completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)if (member.isStaticMember) {info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")group.addStaticMember(groupInstanceId, memberId)} else {group.removePendingMember(memberId)}maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")}
kafka.coordinator.group.GroupCoordinator#updateMemberAndRebalance
可以看到updateMemberAndRebalance更简单,就是更新group中的member信息,然后调用maybePrepareRebalance
private def updateMemberAndRebalance(group: GroupMetadata,member: MemberMetadata,protocols: List[(String, Array[Byte])],callback: JoinCallback): Unit = {group.updateMember(member, protocols, callback)maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")}
kafka.coordinator.group.GroupCoordinator#maybePrepareRebalance
这里也涉及到了group的状态,如果是Stable, CompletingRebalance, Empty三种状态才可调用rebalance方法
private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {group.inLock {//校验group的状态,如果是PreparingRebalance或者Dead是不允许rebalance的if (group.canRebalance)prepareRebalance(group, reason)}}
如果JoinGroupRequest请求中memberId为空,groupInstanceId不为空的情况
这里对应流程图中的分支见图六
在这种情况下依然会生成一个新的memberId,requireKnownMemberId判定会是false。
- 如果是首次请求的话,在内存中不存在groupInstanceId的记录,所以会走addMemberAndRebalance的逻辑。相对于没有groupInstanceId的消费者,会减少一次JoinGroupRequest请求。
- 如果是内存中已有的groupInstanceId的话,会走updateStaticMemberAndRebalance的逻辑,代码如下,无非也就是删除之前的member信息,然后建立新的memberId与member
的关系,这里有一点非常奇怪,可以看到在group.updateMember之后,对oldProtocols赋值,而oldProtocols的作用就是groupManager
.storeGroup报错的时候使用,根据代码注释来看应该是想回滚设置的member及协议的信息,但oldProtocols是在group
.updateMember之后赋值的,所以获取到的一直都是更新后的协议信息,不知道这里是不是kafka的bug,对此已提issue:https://issues.apache.org/jira/browse/KAFKA-13581
private def updateStaticMemberAndRebalance(group: GroupMetadata,newMemberId: String,groupInstanceId: Option[String],protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {//获取原memberId val oldMemberId = group.getStaticMemberId(groupInstanceId)val currentLeader = group.leaderOrNull//将新memberId更新到内存val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)completeAndScheduleNextHeartbeatExpiration(group, member)val knownStaticMember = group.get(newMemberId)group.updateMember(knownStaticMember, protocols, responseCallback)val oldProtocols = knownStaticMember.supportedProtocolsgroup.currentState match {case Stable =>// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistentval selectedProtocolOfNextGeneration = group.selectProtocolif (group.protocolName.contains(selectedProtocolOfNextGeneration)) {info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMapgroupManager.storeGroup(group, groupAssignment, error => {if (error != Errors.NONE) {warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")// Failed to persist member.id of the given static member, revert the update of the static member in the group.group.updateMember(knownStaticMember, oldProtocols, null)val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)completeAndScheduleNextHeartbeatExpiration(group, oldMember)responseCallback(JoinGroupResult(List.empty,memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = currentLeader,error = error))} else {group.maybeInvokeJoinCallback(member, JoinGroupResult(members = List.empty,memberId = newMemberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,// We want to avoid current leader performing trivial assignment while the group// is in stable stage, because the new assignment in leader's next sync call// won't be broadcast by a stable group. This could be guaranteed by// always returning the old leader id so that the current leader won't assume itself// as a leader based on the returned message, since the new member.id won't match// returned leader id, therefore no assignment will be performed.leaderId = currentLeader,error = Errors.NONE))}})} else {maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")}case CompletingRebalance =>// if the group is in after-sync stage, upon getting a new join-group of a known static member// we should still trigger a new rebalance, since the old member may already be sent to the leader// for assignment, and hence when the assignment gets back there would be a mismatch of the old member id// with the new replaced member id. As a result the new member id would not get any assignment.prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId")case Empty | Dead =>throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")case PreparingRebalance =>}}
如果JoinGroupRequest请求中memberId不为空,groupInstanceId不为空的情况
在这种情况下表示带有groupInstanceId的消费者心跳失败又重新加入组,与第二种情况一样,会根据状态判断是否需要调用updateMemberAndRebalance
总结
在这篇文章里我们只是对joinGroup有了个大概的了解,主要分为以下几点
- 如果JoinGroupRequest请求中不存在memberId,会生成一个新的memberId,如果groupInstanceId为空,则会立即返回,让客户端带着memberId再请求一次
- group也有状态转换
- 服务端针对客户端leader的选举很简单,就是判断leaderId为空的话就取当前memberId
- kafka针对groupInstanceId的处理就是在内存中增加了与memberId映射关系,在无memberId加入组时会减少一次JoinGroupRequest的请求。
十一、kafka消费者之joinGroup相关推荐
- kafka基础篇(四)——kafka消费者客户端
一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...
- kafka消费者开发方式小结
[README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...
- Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...
- Kafka消费者原理解析
文章目录 消费者和消费组 创建Kafka消费者 rebalance 分区再均衡 rebalance触发时机 rebalance 分区分配策略 rebalance generatian rebalanc ...
- 聊聊Kafka(三)Kafka消费者与消费组
Kafka消费者与消费组 简介 消费者 概念入门 消费者.消费组 心跳机制 消息接收 必要参数配置 订阅 反序列化 位移提交 消费者位移管理 再均衡 避免重平衡 消费者拦截器 消费组管理 什么是消费者 ...
- Kafka消费者——从 Kafka读取数据
应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 . 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法.如果不先理解 这些概念 ...
- kafka消费者组概念
https://blog.csdn.net/cgs666/article/details/85257819 应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 ...
- Kafka消费者详解
一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...
- kafka 同步提交 异步_极限MQ (5) Kafka 消费者
要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念. 假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来.应用程序需要创建一个消费者对象, ...
最新文章
- 【ACM】杭电OJ 2090
- SegmentFault 巨献 1024 程序猿游戏「红岸的呼唤」第一天任务攻略
- Magento:Paypal付款不成功返回后不要清空购物车产品的解决方案
- 6常见的HTML和CSS面试问答
- 解决CentOS 6 字体变成方框的方法
- pytorch---nn.moduleList 和Sequential
- 13. PHP 表数据入口(table data gateway)
- 小程序 调用地址api
- Mybatis 拦截器报错
- echarts全解析及其用法详解
- 斯坦福大学iOS应用开发教程学习笔记(第六课)故事版 StoryBoard
- Excel 经纬度互相转换
- python计算机视觉编程——基本的图像操作和处理
- 王牌战争文明重启服务器维修中,王牌战争:文明重启在哪下,为什么王牌战争:文明重启进不去...
- 中小板、创业板、新三板和科创板之间的区别
- 市场调研你所不知道的几个关键点
- 小程序实现长按图片弹出保存图片、发送给朋友、识别图中码菜单。
- 数据结构之线性表/队列/栈/树
- 外星人装Ubuntu18.04
- 2021年PHP-Laravel面试题问卷题 答案记录