本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaCluster类。整理出Kafka中有关

  • Metadata API
  • Produce API
  • Fetch API
  • Offset API(Aka ListOffset)
  • Offset Commit/Fetch API
  • Group Membership API
  • Administrative API
      

零、准备工作

  需要运行以下部分的示例代码时,需要提前建好需要的topic,写入一些message,再用consumer消费一下。

1、新建topic

[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
Created topic "kafka_protocol_test".
[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe --topic kafka_protocol_test
Topic:kafka_protocol_test   PartitionCount:4    ReplicationFactor:3 Configs:Topic: kafka_protocol_test  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3Topic: kafka_protocol_test  Partition: 1    Leader: 2   Replicas: 2,3,4 Isr: 2,3,4Topic: kafka_protocol_test  Partition: 2    Leader: 3   Replicas: 3,4,1 Isr: 3,4,1Topic: kafka_protocol_test  Partition: 3    Leader: 4   Replicas: 4,1,2 Isr: 4,1,2

2、produce message

  使用Kafka系列之-自定义Producer中提到的KafkaProducerTool往指定kafka_protocol_test中发送消息,

public class ProducerTest2 {public static void main(String[] args) throws InterruptedException {KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\\Files\\test\\kafkaconfig.properties");int i = 1;while(true) {kafkaProducerTool.publishMessage("message" + (i++));}}
}

  运行一段时间后停止写入。运行一个console-consumerkafka_protocol_test消费message

3、consume message

  运行一个console-consumerkafka_protocol_test消费。注意观察该topic每个partition中的messages数。

[hadoop@kafka001 kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning

  

一、Metadata API

  这个API是通过向Kafka集群发送一个TopicMetadaaRequest请求,得到MetadataResponse响应后从MetadataResponse中解析出Metadata相关信息。
  TopicMetadataRequest的结构和示例如下
  

case class TopicMetadataRequest(val versionId: Short,val correlationId: Int,val clientId: String,val topics: Seq[String])TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)

  得到的MetadataResponse包含的信息如下,可以从PartitionMetadata中获取到Partition相关信息,从TopicMetadata中获取到Topic相关信息,Broker中记录了Brokerip和端口号等。

MetadataResponse => [Broker][TopicMetadata]Broker => NodeId Host Port  (any number of brokers may be returned)NodeId => int32Host => stringPort => int32TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]TopicErrorCode => int16PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas IsrPartitionErrorCode => int16PartitionId => int32Leader => int32Replicas => [int32]Isr => [int32]

1、所包含的信息

可以查询指定Topic是否存在,
指定topic有多少个partition,
每个partition当前哪个broker处于leader状态,
每个broker的host和port是什么

  如果设置了auto.create.topics.enable参数,遇到不存在的topic时,就会按默认replicationpartition新建该不存在的topic
  

2、示例

  生成一个TopicMetadataRequest对象

// 封装一个TopicMetadataRequest类型的请求对象
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
// 发送该请求
val resp: TopicMetadataResponse = consumer.send(req)
// 其中consumer对象是SimpleConsumer类型的
new SimpleConsumer(host, port, config.socketTimeoutMs,config.socketReceiveBufferBytes, config.clientId)

(1)查询topic是否存在
  由于在TopicMetadataRequest中可以发送一组Seq[String]类型的topics,所以获取到的TopicMetadataResponse.topicsMetadataSet[TopicMetadata]类型的。
  对每个TopicMetadata对象,如果其errorCode不为ErrorMapping.NoError即表示该Topic不正常。
  

topicMetadatas.foreach { topic =>if (topic.errorCode == ErrorMapping.NoError)println(s"topic: ${topic.topic}存在")elseprintln(s"topic: ${topic.topic}不存在")
}

(2)获取Topic的Partition个数
  首先将所有TopicMetadata中正常的Topic过滤出来,然后遍历每一个TopicMetadata对象,获取其partitionsMetadata信息,其长度即Partition的个数

val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
existsTopicMetadatas.foreach { topic =>val numPartitions = topic.partitionsMetadata.lengthprintln(s"topic: ${topic.topic} 有${numPartitions}个partition")
}

(3)获取Partition具体情况
  以下代码可以获取到Topic的每个Partition中的Leader Partition以及replication节点的信息。

existsTopicMetadatas.foreach { topic =>println(s"topic:${topic.topic}的Partition信息:")topic.partitionsMetadata.foreach { pm =>val leaderPartition = pm.leaderprintln(s"\tpartition: ${pm.partitionId}")println(s"\tleader节点:$leaderPartition")val replicas = pm.replicasprintln(s"\treplicas节点:$replicas")}
}

3、运行结果

  传入上面新建的kafka_protocol_test以及一个不存在的topic kafka_protocol_test1,以上代码的运行结果如下:

=============Topic相关信息===========
topic: kafka_protocol_test存在
topic: kafka_protocol_test1不存在
topic: kafka_protocol_test 有4个partition
=============Partition相关信息===========
topic:kafka_protocol_test的Partition信息:partition: 0leader节点:Some(id:1,host:kafka001,port:9092)replicas节点:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)partition: 1leader节点:Some(id:2,host:kafka002,port:9092)replicas节点:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)partition: 2leader节点:Some(id:3,host:kafka003,port:9092)replicas节点:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)partition: 3leader节点:Some(id:4,host:kafka004,port:9092)replicas节点:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)

二、Produce API

三、Fetch API

四、Offset API(Aka ListOffset)

  这个API通过向Kafka集群发送一个OffsetRequest对象,从返回的OffsetResponse对象中获取Offset相关信息。
  OffsetRequest对象描述如下

OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]ReplicaId => int32TopicName => stringPartition => int32Time => int64MaxNumberOfOffsets => int32

  上面Time的作用是,获取特定时间(单位为ms)之前的所有messages。如果设置为-1则获取最新的offset,即下一条messagesoffset位置;如果设置为-2则获取第一条messageoffset位置,即当前partition中的offset起始位置。

  OffsetResponse对象描述如下

OffsetResponse => [TopicName [PartitionOffsets]]PartitionOffsets => Partition ErrorCode [Offset]Partition => int32ErrorCode => int16Offset => int64

1、所包含的信息

  通过该API可以获取指定topic-partition集合的合法offset的范围,需要直接连接到PartitionLeader节点。

2、示例

  获取指定topic下所有partitionoffset范围
  封装一个getLeaderOffsets方法,在此方法的基础上分别封装一个getEarliestLeaderOffsets方法用于获取最小offsetgetLatestLeaderOffsets用于获取最大offset
  分别传入的关键参数是前面提到的Time

def getLatestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, LeaderOffset]] =getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
def getEarliestLeaderOffsets(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, LeaderOffset]] =getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L在getLeaderOffsets中,查询到当前partition的leader节点,def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {// 获取当前topicAndPartitions中的所有topicval topics = topicAndPartitions.map(_.topic)// 获取topic对应的MetadataResp对象,之前已过滤不存在的topic,所以这里无需进一步过滤val topicMetadatas = getMetadataResp(topics.toSeq).left.getval leaderMap = topicMetadatas.flatMap { topic =>topic.partitionsMetadata.flatMap { pm =>val tp = TopicAndPartition(topic.topic, pm.partitionId)// 获取对应PartitionMedatada的leader节点信息pm.leader.map { l =>tp -> (l.host -> l.port)}}}.toMapRight(leaderMap)}

  然后在这些节点中,封装一个OffsetRequest对象,向Kafka集群获得OffsetResponse对象。

val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets

  最后从OffsetResponse对象中获取offset范围,

val resp = getMetadataResp(topics.toSeq)// 如果获取的resp是left,则处理返回的Set[TopicMetadata]
val topicAndPartitions = processRespInfo(resp) { resp =>
val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)getPartitions(existsTopicMetadatas)
}.asInstanceOf[Set[TopicAndPartition]]// 获取指定topic-partition最早的offset
val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
// 获取指定topic-partition最晚的offset
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.getprint("=============Offset范围信息===========")
topicAndPartitions.foreach { tp =>println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset范围:")println(s"\t${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
}

3、运行结果

  连接到kafka_protocol_test,运行结果如下

topic: kafka_protocol_test, Partition: 0 的Offset范围:0 ~ 9000
topic: kafka_protocol_test, Partition: 1 的Offset范围:0 ~ 598134
topic: kafka_protocol_test, Partition: 2 的Offset范围:0 ~ 0
topic: kafka_protocol_test, Partition: 3 的Offset范围:0 ~ 91000

  和第零节中图片显示结果一致。

五、Offset Commit/Fetch API

  首先参考Offset Management文档中的描述,分析一下Kafka中有关Offset管理的文档。
  在这篇文档中主要提供了OffsetFetchOffsetCommit两个API,其中
  

1、OffsetFetch API

  这个API可以获取一个Consumer读取messageoffset信息。发送的请求是OffsetFetchRequest类型的对象,接收到的是OffsetFetchResponse类型的响应。具体offset信息可以从OffsetFetchResponse对象中解析。
  发送的Request请求为,需要指定consumer所属的group,以及需要获取offset的所有TopicAndPartitions

val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)或得到的响应为OffsetFetchResponse类型的对象。
val resp = consumer.fetchOffsets(req)其中consumer对象是SimpleConsumer类型的
new SimpleConsumer(host, port, config.socketTimeoutMs,config.socketReceiveBufferBytes, config.clientId)具体获取offset的逻辑如下,
withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>// 连接consumer,发送该OffsetFetchRequest请求val resp = consumer.fetchOffsets(req)val respMap = resp.requestInfo// 从传入的topicAndPartitions中取出不包含在result中的topicAndPartitionval needed = topicAndPartitions.diff(result.keySet)// 遍历每一个需要获取offset的topic-partitionneeded.foreach { tp: TopicAndPartition =>respMap.get(tp).foreach { ome: OffsetMetadataAndError =>// 如果没有错误if (ome.error == ErrorMapping.NoError) {result += tp -> ome} else {errs.append(ErrorMapping.exceptionFor(ome.error))}}}if (result.keys.size == topicAndPartitions.size) {return Right(result)}
}

2、OffsetCommit API

  当最终调用commit()方法,或者如果启用了autocommit参数时,这个API可以使consumer保存其消费的offset信息。
  发送的Request请求为OffsetCommitRequest类型。

  OffsetCommitRequest需要传入的参数如下,

val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
val plus10Offset = offsetInfo._2.offset + 10offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)}
// resetOffsets类型为Map[TopicAndPartition, OffsetAndMetadata]
val req = OffsetCommitRequest(groupId, resetOffsets, 0)
// 发送该请求的方式如下
val resp = consumer.commitOffsets(req)

3、GroupCoordinator API

  需要注意的是这个API在Kafka-0.9以后的版本中才提供。指定Consumer Groupoffsets数据保存在某个特定的Broker中。
  向Kafka集群发送一个GroupCoordinatorRequest类型的请求参数,该request对象中只需要指定一个groupId即可。如下所示,

val req = new GroupCoordinatorRequest(groupId)
val resp = consumer.send(req)

  获取到的Response对象是GroupCoordinatorResponse类型的,在resp.coordinatorOpt中返回一个BrokerEndpoint对象,可以获取该Broker对应的Id, Ip, Port等信息。

4、示例

(1) 运行OffsetFetch API
(a) 获取kafka_protocol_test的consumer group消费状态
  启动一个console-consumerkafka_protocol_test topic消费messages。需要指定一个特定的group.id参数,如下所示,使用默认的consumer.properties配置文件即可。

bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties

  运行后,将其停止,查看当前console-consumer的消费状态

[hadoop@kafka001 kafka]$  bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group kafka_protocol_test            0   9000            9000            0               none
test-consumer-group kafka_protocol_test            1   26886           598134          571248          none
test-consumer-group kafka_protocol_test            2   0               0               0               none
test-consumer-group kafka_protocol_test            3   18296           91000           72704           none

(b) 运行OffsetFetch代码,查看运行结果

  运行时仍然传入test-consumer-group,运行结果如下

Topic: kafka_protocol_test, Partition: 0Offset: 9000
Topic: kafka_protocol_test, Partition: 1Offset: 26886
Topic: kafka_protocol_test, Partition: 2Offset: 0
Topic: kafka_protocol_test, Partition: 3Offset: 18296

  对比后发现,两个offset信息保持一致。

(2)运行OffsetCommit API
  在这里,将OffsetFetch获取到的每个TopicAndPartition对应的Offset10,如果加10后超过其最大Offset,则取最大Offset
  在Commit前后,两次调用OffsetFetch API的代码,前后运行结果如下,
更新前的offset

Topic: kafka_protocol_test, Partition: 0Offset: 9000
Topic: kafka_protocol_test, Partition: 1Offset: 26886
Topic: kafka_protocol_test, Partition: 2Offset: 0
Topic: kafka_protocol_test, Partition: 3Offset: 18296
更新后的offset:(partition 0和partition 2没有变化是由于加10后超过了该partition的offset范围最大值)
Topic: kafka_protocol_test, Partition: 0Offset: 9000
Topic: kafka_protocol_test, Partition: 1Offset: 26896
Topic: kafka_protocol_test, Partition: 2Offset: 0
Topic: kafka_protocol_test, Partition: 3Offset: 18306

(3)运行Group Coordinator API
  传入一个consumer group后,查看其运行结果

Comsuner Group : test-consumer-group, coordinator broker is:id: 1, host: kafka001, port: 9092

六、Group Membership API

  这个API从Kafka-0.9.0.0版本开始出现。
  在0.9以前的client api中,consumer是要依赖Zookeeper的。因为同一个consumer group中的所有consumer需要进行协同,进行下面所讲的rebalance。但是因为zookeeper的“herd”与“split brain”,导致一个group里面,不同的consumer拥有了同一个partition,进而会引起消息的消费错乱。为此,在0.9中,不再用zookeeper,而是Kafka集群本身来进行consumer之间的同步。下面引自kafka设计的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol

  相关知识点可以参考Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理。

七、Administrative API

  注意,这个API也是从Kafka-0.9之后的client版本中才提供。通过这个API可以对Kafka集群进行一些管理方面的操作,比如获取所有的Consumer Groups信息。想要获取集群中所有Consumer Groups信息,需要发送一个ListGroupRequest请求到所有的Brokers节点。
  还可以通过发送一个DescribeGroupsRequest类型的请求对象,获取对特定Consumer Group的描述。

  在Kafka-0.9之后的client中,提供了一个kafka.admin.AdminClient类,调用createSimplePlaintext方法,传入一个broker list字val client = AdminClient.createSimplePlaintext(“kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092”)AdminClient`提供了很多方法,比如

def findCoordinator(groupId: String): Node
def findAllBrokers(): List[Node]
def listAllGroups(): Map[Node, List[GroupOverview]]
def listAllConsumerGroups(): Map[Node, List[GroupOverview]]

  等等。

转载于:https://www.cnblogs.com/wuyida/p/6300206.html

Kafka系列之-Kafka Protocol实例分析相关推荐

  1. Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用

    KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.源项目Github地址为:https://github.com/q ...

  2. 《Kafka系列》Kafka详细教程入门

    Kafka 1 消息队列--消息中间件 1.1 消息队列的作用 1.2 消息队列的概念--MQ Message 在互联网中,多台设备产生通信的数据的总称:可以是视频.文本.音频等等. Quene 一种 ...

  3. kafka系列之kafka生产者与分区(3)

    概要 当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量? 举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消 ...

  4. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  5. Kafka系列 - 10 Kafka副本|分区副本分配|手动调整分区副本|Leader Partition 负载平衡|增加副本因子

    文章目录 1. 分区副本分配 2. 手动调整分区副本 3. Leader Partition 负载平衡 4. 增加副本因子 1. 分区副本分配 如果 kafka 服务器只有 4 个节点,那么设置 ka ...

  6. Kafka系列 - 14 Kafka消费者|分区的分配策略及再平衡|Range|RoundRobin|Sticky|CooperativeSticky

    文章目录 1. 分区的分配以及再平衡 2. Range 分区分配以及再平衡 3. RoundRobin 分区分配以及再平衡 4. Sticky 分区分配以及再平衡 1. 分区的分配以及再平衡 一个co ...

  7. Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制

    目录 开启SASL 控制台配置用户 ACL授权 Python客户端访问 ACL常用命令 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴 ...

  8. Kafka系列(七)、Kafka套件 Confluent Platform 单机/集群部署

    目录 简介 单机部署 集群部署 尾巴 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 k ...

  9. Kafka系列(六)、Kafka开发套件kafka lenses 安装及使用(带WebUI)

    目录 介绍 安装 使用 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 kafka管理监 ...

最新文章

  1. PostgreSQL10.5安装后(Win10)环境变量配置与运行
  2. HBase眼高手低从Shell到IDEA编程、心路笔记、踩坑过程
  3. Web前端开发代码规范(基础)
  4. python里面map函数_python中的map()函数
  5. Ibatis动态(dynamic)查询
  6. 求离散数据的突变点_Nat Gen | 染色质三维构象决定突变分布
  7. keytool命令总结
  8. js中的escape的用法汇总
  9. linux重定向命令语法,linux重定向命令应用及语法
  10. ACS——网管的九阳神功
  11. Java 反射:Classes
  12. java工程师职业价值观_什么是职业价值观?舒伯职业价值观测试
  13. 新媒体运营工具大盘点,收藏方便不备之需!
  14. Allegro中的NET到NET之间的间距设置-网络之间的间距
  15. 目标检测评估指标 mAP P R
  16. Adobe Photoshop 2021 22.4.2 绿色精简版
  17. linux这么重命名文件,如何在Linux中重命名文件
  18. 论文中的定理(Theorem)、引理(Lemma)、推论(Corollary)
  19. 电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
  20. C# 读取txt文件生成Word文档

热门文章

  1. 使用.NET Core 3.0 预览版,Web API和Visual Studio 2019进行ASP.NET Core Blazor游戏开发
  2. 使用EntityFramework Core和Enums作为字符串的ASP.NET Core Razor页面——第四部分
  3. float在python中的书写形式错误的是_python – 不支持的操作数类型:’float’和’str’错误...
  4. Java写js的Ajax代码_用JS写的一个Ajax库(实例代码)
  5. 安装python进度条不动了_TensorFlow(一):使用Anconda安装TensorFlow
  6. 并注册烧写钩子 获取启动介质类型_Spark Application的注册 源码剖析
  7. wincc上位机与1200组态步骤_组态上位机WINCC与PLC通讯连接
  8. Tesseract-OCR图片识别为文字
  9. java做flv直播服务器,EasyDSS流媒体服务器软件(支持RTMP/HLS/HTTP-FLV/视频点播/视频直播)-正式环境安装部署攻略...
  10. input和button放在同一行_黑龙江作家协会冯殿波散文集《足音》84情系东保卫,魅力采风行...