这一节我们主要来分析joinGroup这块的代码,主要流程如图一。

流程展示

拆解JoinGroup协议

客户端的代码我们就不拿出来说了,等后面说到关键点的时候再拿出来一起分析,这里目前只需要知道会发JoinGroupRequest请求给服务端,请求及返回数据样例如下,协议的结构见图二及图三。

JoinGroupRequestData(groupId=‘mykafka-group’, sessionTimeoutMs=10000, rebalanceTimeoutMs=300000,
memberId=‘consumer-mykafka-group-1-e767a2e9-ac9d-4e61-af95-e50894101de9’, groupInstanceId=null, protocolType=‘consumer’, protocols=[JoinGroupRequestProtocol(name=‘cooperative-sticky’, metadata=[0, 1, 0, 0, 0, 1, 0, 6, 116, 101, 115, 116, 95, 50, -1, -1, -1, -1, 0, 0, 0, 0])])

JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, memberId=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_1-c3c6047e-bbfe-48fb-8eba-cbb55d97ada8’, groupInstanceId=‘mykafka-group_4_1’, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0])])

服务端处理

服务端对JoinGroupRequest请求会在kafka.server.KafkaApis#handleJoinGroupRequest代码中做处理。从图一的流程图中我们可以知道是否存在memberId及groupInstanceId会走不同的代码分支,下面我们以下几个方面来剖析源码。

如果JoinGroupRequest请求中memberId为空,groupInstanceId为空的情况

这里对应流程图中的分支见图四

首先在入口处会有个是否需要memberId的判断,代码如下,这是为了兼容低版本的消费端。在kafka2.5中,joinGroupRequest.version是7,经过比对各个版本的源码,kafka2.3以上joinGroupRequest.version会大于等于4,即在groupInstanceId为空的情况下是需要memberId的。

val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty

接着会走kafka.coordinator.group.GroupCoordinator#doUnknownJoinGroup,代码如下,由于此时memberId及groupInstanceId均为空,会走(requireKnownMemberId) 为true的逻辑,大家可以看到如果需要memberId的话这里会生成memberId然后返回response,可以再往深一点想,即在consume初始消费的时候,在没有groupInstanceId的情况下,会发送两次请求,第一次请求memberId为空,服务端会返回对应的memberId,第二次请求会带上memberId进行真正的joinGroup操作。

private def doUnknownJoinGroup(group: GroupMetadata,groupInstanceId: Option[String],requireKnownMemberId: Boolean,clientId: String,clientHost: String,rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {group.inLock {//3.1.1 生成memberId优先使用groupInstanceId,没有则用clientIdval newMemberId = group.generateMemberId(clientId, groupInstanceId)//这里首先判断缓存中有没有groupInstanceId,如果没有的话又需要生成memberId,则生成memberId后返回,如果存在groupInstanceId,则在生成memberId// 后直接调用addMemberAndRebalanceif (group.hasStaticMember(groupInstanceId)) {updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)} else if (requireKnownMemberId) {//3.1.2 如果需要memberId的话会直接返回responsedebug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")group.addPendingMember(newMemberId)addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))} else {info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,clientId, clientHost, protocolType, protocols, group, responseCallback)}}}

kafka.coordinator.group.GroupMetadata#generateMemberId

生成memberId的代码很简单,就是clientId或者groupInstanceId加uuid

  def generateMemberId(clientId: String,groupInstanceId: Option[String]): String = {var memberId="";groupInstanceId match {case None =>memberId = clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toStringcase Some(instanceId) =>memberId = instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString}memberId}

如果JoinGroupRequest请求中memberId不为空,groupInstanceId为空的情况

这里对应流程图中的分支见图五

在这种情况下是走的kafka.coordinator.group.GroupCoordinator#doJoinGroup方法。看着方法比较长,实际就分两块,经过各种校验之后,要不走addMemberAndRebalance,要不就走updateMemberAndRebalance。

private def doJoinGroup(group: GroupMetadata,memberId: String,groupInstanceId: Option[String],clientId: String,clientHost: String,rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {group.inLock {if (group.is(Dead)) {responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))} else if (group.isPendingMember(memberId)) {// A rejoining pending member will be accepted. Note that pending member will never be a static member.if (groupInstanceId.isDefined) {throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +s"into pending member bucket with member id $memberId")} else {debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +s"${group.currentState} state. Adding to the group now.")addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,clientId, clientHost, protocolType, protocols, group, responseCallback)}} else {val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))} else if (!group.has(memberId) || groupInstanceIdNotFound) {responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))} else {val member = group.get(memberId)group.currentState match {case PreparingRebalance =>updateMemberAndRebalance(group, member, protocols, responseCallback)case CompletingRebalance =>if (member.matches(protocols)) {// member is joining with the same metadata (which could be because it failed to// receive the initial JoinGroup response), so just return current group information// for the current generation.responseCallback(JoinGroupResult(members = if (group.isLeader(memberId)) {group.currentMemberMetadata} else {List.empty},memberId = memberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = group.leaderOrNull,error = Errors.NONE))} else {// member has changed metadata, so force a rebalanceupdateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>val member = group.get(memberId)if (group.isLeader(memberId) || !member.matches(protocols)) {// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.// The latter allows the leader to trigger rebalances for changes affecting assignment// which do not affect the member metadata (such as topic metadata changes for the consumer)updateMemberAndRebalance(group, member, protocols, responseCallback)} else {// for followers with no actual change to their metadata, just return group information// for the current generation which will allow them to issue SyncGroupresponseCallback(JoinGroupResult(members = List.empty,memberId = memberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = group.leaderOrNull,error = Errors.NONE))}case Empty | Dead =>// Group reaches unexpected state. Let the joining member reset their generation and rejoin.warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +s"unexpected group state ${group.currentState}")responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))}}}}}

到底什么时候会走addMemberAndRebalance,什么时候会走updateMemberAndRebalance呢

1、addMemberAndRebalance:

  • 在pendingMembers中,通过前面的代码分析可知在没有memberId且没有groupInstanceId的情况下,会生成memberId并返回,在那个时候就会将生成的memberId
    放入pendingMembers中,表示待加入的member。也就是说在这种情况下,客户端第二次带上memberId请求服务端的时候会走走addMemberAndRebalance的逻辑。
  • 或者不需要memberId(为了兼容老版本的客户端)

2、updateMemberAndRebalance

  • 在group状态为PreparingRebalance时,即触发了joinGroup,在等待其他组成员加入的状态,这里会直接调用updateMemberAndRebalance
  • 在group状态为CompletingRebalance时,即组成员已全部加入,并选举出consumeLeader
    ,等待同步分配方案的状态,这时会匹配传入的协议信息,如果不匹配则调用updateMemberAndRebalance,会再次触发rebalance
  • 在group状态为Stable时,即完成了重分配,处于稳定态,如果是consumeleader或者不匹配协议信息,则调用updateMemberAndRebalance

kafka.coordinator.group.GroupCoordinator#addMemberAndRebalance

这里主要是多一步,会选举消费端leader,代码也很简单,校验如果leaderId为空则取当前的member为leader,然后调用maybePrepareRebalance

 private def addMemberAndRebalance(rebalanceTimeoutMs: Int,sessionTimeoutMs: Int,memberId: String,groupInstanceId: Option[String],clientId: String,clientHost: String,protocolType: String,protocols: List[(String, Array[Byte])],group: GroupMetadata,callback: JoinCallback): Unit = {val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,clientId, clientHost, rebalanceTimeoutMs,sessionTimeoutMs, protocolType, protocols)member.isNew = trueinfo(s"group.generationId:${group.generationId}")// update the newMemberAdded flag to indicate that the join group can be further delayedif (group.is(PreparingRebalance) && group.generationId == 0)group.newMemberAdded = true//会选举leadergroup.add(member, callback)completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)if (member.isStaticMember) {info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")group.addStaticMember(groupInstanceId, memberId)} else {group.removePendingMember(memberId)}maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")}

kafka.coordinator.group.GroupCoordinator#updateMemberAndRebalance

可以看到updateMemberAndRebalance更简单,就是更新group中的member信息,然后调用maybePrepareRebalance

  private def updateMemberAndRebalance(group: GroupMetadata,member: MemberMetadata,protocols: List[(String, Array[Byte])],callback: JoinCallback): Unit = {group.updateMember(member, protocols, callback)maybePrepareRebalance(group, s"Updating metadata for member ${member.memberId}")}

kafka.coordinator.group.GroupCoordinator#maybePrepareRebalance

这里也涉及到了group的状态,如果是Stable, CompletingRebalance, Empty三种状态才可调用rebalance方法

  private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {group.inLock {//校验group的状态,如果是PreparingRebalance或者Dead是不允许rebalance的if (group.canRebalance)prepareRebalance(group, reason)}}

如果JoinGroupRequest请求中memberId为空,groupInstanceId不为空的情况

这里对应流程图中的分支见图六

在这种情况下依然会生成一个新的memberId,requireKnownMemberId判定会是false。

  • 如果是首次请求的话,在内存中不存在groupInstanceId的记录,所以会走addMemberAndRebalance的逻辑。相对于没有groupInstanceId的消费者,会减少一次JoinGroupRequest请求。
  • 如果是内存中已有的groupInstanceId的话,会走updateStaticMemberAndRebalance的逻辑,代码如下,无非也就是删除之前的member信息,然后建立新的memberId与member
    的关系,这里有一点非常奇怪,可以看到在group.updateMember之后,对oldProtocols赋值,而oldProtocols的作用就是groupManager
    .storeGroup报错的时候使用,根据代码注释来看应该是想回滚设置的member及协议的信息,但oldProtocols是在group
    .updateMember之后赋值的,所以获取到的一直都是更新后的协议信息,不知道这里是不是kafka的bug,对此已提issue:https://issues.apache.org/jira/browse/KAFKA-13581
private def updateStaticMemberAndRebalance(group: GroupMetadata,newMemberId: String,groupInstanceId: Option[String],protocols: List[(String, Array[Byte])],responseCallback: JoinCallback): Unit = {//获取原memberId                                        val oldMemberId = group.getStaticMemberId(groupInstanceId)val currentLeader = group.leaderOrNull//将新memberId更新到内存val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId)completeAndScheduleNextHeartbeatExpiration(group, member)val knownStaticMember = group.get(newMemberId)group.updateMember(knownStaticMember, protocols, responseCallback)val oldProtocols = knownStaticMember.supportedProtocolsgroup.currentState match {case Stable =>// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistentval selectedProtocolOfNextGeneration = group.selectProtocolif (group.protocolName.contains(selectedProtocolOfNextGeneration)) {info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.")val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMapgroupManager.storeGroup(group, groupAssignment, error => {if (error != Errors.NONE) {warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}")// Failed to persist member.id of the given static member, revert the update of the static member in the group.group.updateMember(knownStaticMember, oldProtocols, null)val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId)completeAndScheduleNextHeartbeatExpiration(group, oldMember)responseCallback(JoinGroupResult(List.empty,memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,leaderId = currentLeader,error = error))} else {group.maybeInvokeJoinCallback(member, JoinGroupResult(members = List.empty,memberId = newMemberId,generationId = group.generationId,protocolType = group.protocolType,protocolName = group.protocolName,// We want to avoid current leader performing trivial assignment while the group// is in stable stage, because the new assignment in leader's next sync call// won't be broadcast by a stable group. This could be guaranteed by// always returning the old leader id so that the current leader won't assume itself// as a leader based on the returned message, since the new member.id won't match// returned leader id, therefore no assignment will be performed.leaderId = currentLeader,error = Errors.NONE))}})} else {maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol")}case CompletingRebalance =>// if the group is in after-sync stage, upon getting a new join-group of a known static member// we should still trigger a new rebalance, since the old member may already be sent to the leader// for assignment, and hence when the assignment gets back there would be a mismatch of the old member id// with the new replaced member id. As a result the new member id would not get any assignment.prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId")case Empty | Dead =>throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " +s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.")case PreparingRebalance =>}}

如果JoinGroupRequest请求中memberId不为空,groupInstanceId不为空的情况

在这种情况下表示带有groupInstanceId的消费者心跳失败又重新加入组,与第二种情况一样,会根据状态判断是否需要调用updateMemberAndRebalance

总结

在这篇文章里我们只是对joinGroup有了个大概的了解,主要分为以下几点

  • 如果JoinGroupRequest请求中不存在memberId,会生成一个新的memberId,如果groupInstanceId为空,则会立即返回,让客户端带着memberId再请求一次
  • group也有状态转换
  • 服务端针对客户端leader的选举很简单,就是判断leaderId为空的话就取当前memberId
  • kafka针对groupInstanceId的处理就是在内存中增加了与memberId映射关系,在无memberId加入组时会减少一次JoinGroupRequest的请求。

十一、kafka消费者之joinGroup相关推荐

  1. kafka基础篇(四)——kafka消费者客户端

    一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...

  2. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

  3. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

  4. Kafka消费者原理解析

    文章目录 消费者和消费组 创建Kafka消费者 rebalance 分区再均衡 rebalance触发时机 rebalance 分区分配策略 rebalance generatian rebalanc ...

  5. 聊聊Kafka(三)Kafka消费者与消费组

    Kafka消费者与消费组 简介 消费者 概念入门 消费者.消费组 心跳机制 消息接收 必要参数配置 订阅 反序列化 位移提交 消费者位移管理 再均衡 避免重平衡 消费者拦截器 消费组管理 什么是消费者 ...

  6. Kafka消费者——从 Kafka读取数据

    应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 . 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法.如果不先理解 这些概念 ...

  7. kafka消费者组概念

    https://blog.csdn.net/cgs666/article/details/85257819 应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 ...

  8. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  9. kafka 同步提交 异步_极限MQ (5) Kafka 消费者

    要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念. 假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来.应用程序需要创建一个消费者对象, ...

最新文章

  1. 【ACM】杭电OJ 2090
  2. SegmentFault 巨献 1024 程序猿游戏「红岸的呼唤」第一天任务攻略
  3. Magento:Paypal付款不成功返回后不要清空购物车产品的解决方案
  4. 6常见的HTML和CSS面试问答
  5. 解决CentOS 6 字体变成方框的方法
  6. pytorch---nn.moduleList 和Sequential
  7. 13. PHP 表数据入口(table data gateway)
  8. 小程序 调用地址api
  9. Mybatis 拦截器报错
  10. echarts全解析及其用法详解
  11. 斯坦福大学iOS应用开发教程学习笔记(第六课)故事版 StoryBoard
  12. Excel 经纬度互相转换
  13. python计算机视觉编程——基本的图像操作和处理
  14. 王牌战争文明重启服务器维修中,王牌战争:文明重启在哪下,为什么王牌战争:文明重启进不去...
  15. 中小板、创业板、新三板和科创板之间的区别
  16. 市场调研你所不知道的几个关键点
  17. 小程序实现长按图片弹出保存图片、发送给朋友、识别图中码菜单。
  18. 数据结构之线性表/队列/栈/树
  19. 外星人装Ubuntu18.04
  20. 2021年PHP-Laravel面试题问卷题 答案记录

热门文章

  1. 拥抱浪潮 -- 吴军的《浪潮之巅》读后感
  2. 对于团队中的“孙悟空”的管理
  3. oracle 掩码,oracle RAC 修改子网掩码如何计算subnet
  4. 怎样让DNS服务器响应,DNS服务器不能响应的四大解决办法,
  5. 8天4起,美军机又击落一个高空“不明物”
  6. SSRS日期参数的初始化-设计 SQL
  7. 如何查看Windows xp是32位还是64位
  8. 【codevs2343】简单题【位运算】【卡常大法好】
  9. ZigBee无线插座设计
  10. Rackspace将收购TriCore进军企业应用管理