目录:

《Kafka Producer设计分析》

《KafkaProducer类代码分析》

《RecordAccumulator类代码分析》

《Sender类代码分析》

《NetworkClient类代码分析》

-------------------------------------------------------------------

上一节《NetworkClient类代码分析》

前文我们分析了Kafka生产者端的源代码,了解了生产者产生消息的过程。消息由生产者发布到某个主题的某个分区上,其实最终是被存储在服务端的某个broker上。而消费者由订阅行为来决定它所要消费的主题和分区。消费者通过poll操作,不断的从服务端拉取该主题分区上产生的消息。

相信有兴趣看kafka源代码的同学,肯定对kafka的基本概念和原理有所了解。关于消费者,我们知道在服务端会有GroupCoordinator,负责每个consumer group的leader的选举,以及分发分区分配结果,而coumer的leader则会根据分区分配策略进行分区分配。这里需要注意,分区分配结果并不是由leader分发给同组的consumer,而是leader返回给GroupCoordinator,再有GroupCoordinator进行分发。

每当Broker有变化,或者Consumer Group有出入组的变化时,会触发ConsumerGroup的rebalance。也就是上述的分区分配工作。

另外消费者本地保存了它所负责主题分区的消费状态,通过手动和自动的方式提交到服务端的内部主题中。rebalance过后,消费者重新从内部主题获取对应主题分区的消费位置。

关于消费者更多的内容介绍请参考我另外一篇文章《Apache Kafka核心组件和流程-协调器(消费者和组协调器)》

上面我们回顾了Consumer的设计和流程,为我们进入源代码分析做好铺垫。接下来我们将从KafkaConsumer入手,进行代码分析。

KafkaConsumer

我们先看下使用KafkaConsumer进行消费的部分代码:

private final KafkaConsumer<Integer, String> consumer;.........consumer.subscribe(Collections.singletonList(this.topic));ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<Integer, String> record : records) {System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());}

以上代码来自于源代码包中的例子,我们可以看到KafkaConsumer先订阅topic,然后通过poll方法进行消息拉取。

可以看到KafkaConsumer通过poll方法进行消费,这也是KafkaConsumer最主要的方法。

我们先看看KafkaConsumer内部的其他组件有哪些,见下图:

上图介绍了KafkaConsumer内部的几个重要组件:

1、前文说过消费者要自己记录消费的位置(但也需要提交到服务端保存,为了rebalance后的消费能衔接上),所以我们需要SubScriptionState来保存消费的状态。

2、ConsumerCoordinator负责和GroupCoordinator通讯,例如在leader选举,入组,分区分配等过程。

3、ConsumerNetworkClient是对NetworkClient的封装,如果你是从producer看过来的,想必对NetworkClient十分了解,他对nio的组件进行封装,实现网络IO。

4、PartitionAssignor,这是分区分配策略,在进行分区分配的时候会用到。

5、Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。

我们抛开订阅、rebalance这些流程,先以kafka消费流程为主,进行分析。有些组件在消费流程中是涉及不到的。消费流程主要涉及到Fetcher、SubScriptionState和ConsumerNetworkClient。特别是Fetcher,承担了重要的工作。不过我们还需要一步步来,先进入poll方法的分析。

poll()方法

这是消息拉取的入口方法,他会从上次消费的位置拉取消息,也可以手动指定消费位置。入参是阻塞的时长,如果有消息将会立即返回,否则会阻塞到超时,如果没有数据则返回空的数据集合。

代码如下:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {acquireAndEnsureOpen();try {if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expiresdo {client.maybeTriggerWakeup();if (includeMetadataInTimeout) {if (!updateAssignmentMetadataIfNeeded(timer)) {return ConsumerRecords.empty();}} else {while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {log.warn("Still waiting for metadata");}}final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);if (!records.isEmpty()) {// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}return this.interceptors.onConsume(new ConsumerRecords<>(records));}} while (timer.notExpired());return ConsumerRecords.empty();} finally {release();}
}

逻辑说明:

1、通过acquireAndEnsureOpen()确保本对象是单线程进入,这是因为KafkaConsumer非线程安全。

2、检查是否订阅了topic

3、进入主循环,条件是没有超时

4、在主循环中通过pollForFetches()拉取一次消息。这个方法中先检查是否经存在拉取过的未加工消息,这是因为上一轮次拉取做了提前拉取处理。有可能已经拉取回消息等待处理。如果没有已拉取未加工数据,则准备新的拉取请求,网络IO拉取消息,加工拉取回来的数据。

5、如果上一步拉取到消息,并不会立即返回,而是再一次触发消息拉取,并且使用的是非阻塞方式,调用client.pollNoWakeup()。这样做的目的是,提前网络IO,把消息拉取请求发出去。在网络IO的同时,消息数据返回给consumer的调用者进行业务处理。这样做到了并行处理,提高了效率。等下次调用KafkaConsumer进行poll,当进行到第4步时,有可能直接返回了上轮次提前拉取到的消息,从而省去了网络IO时间。

我们通过下图帮助理解上面4、5步的设计:

图中带颜色的方框代表在整个拉取消息的流程中,不同的处理过程,分布于不同的对象中。图中下半部分展示的是Kafka处理逻辑。可以看到在第一轮次调用了两次ConusmerNetworkClient进行IO处理,第二次IO的同时,调用者已经开始拿到返回的消息进行业务处理,这里实现了并行处理。进入第二轮次,我们发现kafkaConsumer可以直接取到上轮第二次IO回来的消息进行加工,加工后返回调用者,进行业务处理,同时下一轮次的消息拉取异步进行中。可以看到第二轮次的总时长已经没有了网络IO的时长,因为这部分工作在上一轮次已经异步进行完成。

如果不这样做,会怎么样呢?我们看图中上半部分,我们发现每个轮次都是一样的,网络IO都需要同步等待,从第二轮开始,整个消息拉取处理的时长明显增加了IO部分,会更长。

以上情况比较极端,每次提前IO都会返回数据,并且消息的业务处理时长大于网络IO。这种情况下,能最大发挥出异步IO的优势。

以上这种设计的小细节真的值得我们来学习。读源代码在了解原理的同时,我们也要多总结优秀的设计思想,对我们的工作很有帮助。

从上面的分析看到,真正消息拉取的代码是:

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

下面我们继续分析pollForFetches方法。

pollForFetches()方法

这个方法完成了从服务端拉取消息的动作,这个过程主要使用了Fetcher和ConsumerNetworkClient两个组件。Fetcher负责准备好拉取消息的request、处理response,并且把消息转化为对调用者友好的格式。ConsumerNetworkClient负责把请求发送出去,接收返回,也就是网络IO工作。

他的主要流程是如下四步:

1、查看是否已经存在拉取回来未加工的消息原始数据,有的话立即调用fetcher.fetchedRecords()加工,然后返回。

2、如果没有未加工的原始数据,那么调用fetcher.sendFetches()准备拉取请求。

3、通过ConsumerNetworkClient发送拉取请求。

4、加工拉取回的原始数据,返回。

其实正常来说2,3,4步流程就足够了。为什么会有第1步呢?那些已经存在的未加工的数据哪里来的?如果你理解了前面所讲的异步拉取设计,那么你应该知道答案。这些已经存在的未加工数据来自于上一轮次的异步IO。正是因为有了异步的IO拉取,才会有第一步的处理可能。

完整代码如下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}Timer pollTimer = time.timer(pollTimeout);client.poll(pollTimer, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});timer.update(pollTimer.currentTimeMs());// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

可以看到以上过程除了IO操作外,都是通过Fetcher完成的,足以体现他的重要。接下来的章节将会重点分析Fetcher。

小结

本篇先是回顾了Kafka消费者的设计,然后从KafkaConsumer的Poll方法入手对拉取的逻辑进行分析。Kafka很巧妙的采用异步IO方式,缩短整个流程的时长。接下来我们将会进入Fetcher的分析,看其如何准备拉取消息的请求,并完成消息的转化处理。

你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析相关推荐

  1. 你绝对能看懂的Kafka源代码分析-Kafka Producer设计分析

    之前我写了<Kafka入门教程轻松学>系列文章,半年来有1万多的阅读量,虽然不算很多,但看到很多朋友的支持,也给了我继续写下去的动力.接下来我会再写一个系列文章----<你绝对能看懂 ...

  2. 你绝对能看懂的Kafka源代码分析-RecordAccumulator类代码分析

    目录: <Kafka Producer设计分析> <KafkaProducer类代码分析> <RecordAccumulator类代码分析> <Sender类 ...

  3. 看懂这5幅图,研发效能分析和改进就容易了

    简介:作为 CTO 或企业管理者,我们如何去了解和衡量研发团队的研发效能呢?作为 PMO 和效能负责人,我们该从哪几个维度来回答关于研发效能的问题呢?如何通过效能数据分析,帮助企业管理者透明化研发效能 ...

  4. 一文看懂C语言链表(原创) --- 包含完整代码

    链表是一种可以动态的进行内存分配的数据结构 ### 相当于长度不固定的结构体数组 链表中的元素在内存中的地址可以是不连续的 链表这种数据结构必须使用指针才能实现 用结构体建立链表是最合适的 例如: s ...

  5. 希望今年能看懂和写出这样的Swift代码

    macOS 终端程序下载 func anyCommonElements<T: Sequence, U: Sequence>(_ lhs: T, _ rhs: U) -> Boolwh ...

  6. 如何看懂k线图:K线详细分析图解

    K线图(Candlestick chart)源处于日本德川幕府时代,它是被当时日本的米市商人用来记录米市的行情与价格波动,因为其独到的标画方式而被引入到股市及期货市场.所有的投资者都可以根据K线的实体 ...

  7. 搞懂静态代码分析,看这文就够了!

    作者 | 赖建新 来源 | 鉴释 封图 | 东方IC 什么是静态代码分析? 静态代码分析是指在不实际执行程序的情况下,对代码语义和行为进行分析,由此找出程序中由于错误的编码导致异常的程序语义或未定义的 ...

  8. 一文看懂:数据指标体系的4大类型

    很多同学问:"有没有普遍的.一般的指标体系梳理方法?"网上常见的指标体系分享,大多是互联网的AARRR一类,现实中情况却很复杂.普遍的方法当然有,就是基于业务逻辑,梳理指标体系.从 ...

  9. matlab中ode45如何设置,matlab 中ode45的源代码如何看懂

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 求常微分方程的数值解 ode45方法的源代码怎么看懂呢?四百多行 如何理解这些代码的核心思想 以方便未来自己使用呢?求大神指点迷津 感激不尽 functi ...

最新文章

  1. 开放linux下mysql数据库3306端口
  2. python实现计算器代码 博客园_python实现计算器
  3. golang 使用 redis 的教程
  4. Django笔记---数据库设计
  5. TypeScript Decorators 的使用说明
  6. ObjectArx创建指定块
  7. 传智书城首页设计代码_(自适应手机版)响应式创意餐饮酒店装饰设计类网站织梦模板 html5蓝色餐饮酒店设计网站源码下载...
  8. Hash Collision DoS 攻击
  9. js日期函数表达天,时,分,秒
  10. Android Studio 第一个JNI程序
  11. 关于 html 中 table 表格 tr,td 的高度和宽度
  12. 2021年上半年软考真题网络工程师真题及答案解析
  13. android逆向学习路线(适合新手)
  14. java计算机毕业设计HTML5旅游网站源码+mysql数据库+系统+lw文档+部署
  15. 智慧校园: 00 开发流程
  16. 桌面智能分析产品+“智同211”计划,永洪科技打造数据价值生态圈!
  17. 论文-Estimation–Action–Reflection: Towards Deep Interaction Between Conversational and Recommender Sys
  18. 微信小程序开发一定要服务器么,该怎么选择小程序服务器?
  19. android 西班牙语,Android新增语言的方法(墨西哥的西班牙语)
  20. Java进阶:java程序设计慕课版课后答案浪潮优派

热门文章

  1. 如何用C语言实现渣男通讯录
  2. 常用的相似度计算方法原理及实现
  3. 苹果微信分身怎么弄 苹果微信多开
  4. LeetCode-重新安排行程(C++)
  5. 服装产业数字化升级,低代码赋能企业柔性生产
  6. 【农业发展】趋势与历史机遇
  7. 可用软硬件技术来检测与消除计算机病毒,重庆计算机一模级拟试题.doc
  8. CISP管理部分-4、业务连续性
  9. 如何实现一个简单好用的roadmap制作工具
  10. nvidia控制面板可以卸载吗?