多线程开发消费者实例

  • Kafka Java Consumer设计原理
  • 多线程方案
    • 1. 消费者程序启动多个线程
    • 2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。
  • 方案一的优势
  • 方案一的不足
  • 方案二的优势
  • 方案二的不足
  • 方案一主体实现
  • 方案二主体实现

Kafka Java Consumer设计原理

Kafka0.10.1.0版本开锁后,KafkaConsumer就变成了双线程设计,即用户主线程心跳线程。但是在消费这个层面上看都还是单线程的设计。

而在老版本kafka中Scala Consumer 的API是多线程的,并且是阻塞机制,为了更好的打造上下游生态,Kafka将更好实现的单线程API推出,并且是否使用多线程来交给用户的选择。

多线程方案

1. 消费者程序启动多个线程

每个线程维护专属的 KafkaConsumer 实例负责完整的消息获取、消息处理流程。如下图所示:

2. 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。

获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

方案一的优势

  1. 实现起来简单,因为它比较符合目前我们使用 Consumer API 的习惯。我们在写代码的
    时候,使用多个线程并在每个线程中创建专属的 KafkaConsumer 实例就可以了。
  2. 多个线程之间彼此没有任何交互,省去了很多保障线程安全方面的开销。
  3. 由于每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因
    此,Kafka 主题中的每个分区都能保证只被一个线程处理,这样就很容易实现分区内的
    消息消费顺序。这对在乎事件先后顺序的应用场景来说,是非常重要的优势。

方案一的不足

  1. 每个线程都维护自己的 KafkaConsumer 实例,必然会占用更多的系统资源,比如内
    存、TCP 连接等。在资源紧张的系统环境中,方案 1 的这个劣势会表现得更加明显。
  2. 这个方案能使用的线程数受限于 Consumer 订阅主题的总分区数。我们知道,在一个消
    费者组中,每个订阅分区都只能被组内的一个消费者实例所消费。假设一个消费者组订
    阅了 100 个分区,那么方案 1 最多只能扩展到 100 个线程,多余的线程无法分配到任
    何分区,只会白白消耗系统资源。当然了,这种扩展性方面的局限可以被多机架构所缓
    解。除了在一台机器上启用 100 个线程消费数据,我们也可以选择在 100 台机器上分别
    创建 1 个线程,效果是一样的。因此,如果你的机器资源很丰富,这个劣势就不足为虑
    了。
  3. 每个线程完整地执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,造成消息处理
    速度慢,就很容易出现不必要的 Rebalance,从而引发整个消费者组的消费停滞。这个
    劣势你一定要注意。我们之前讨论过如何避免 Rebalance。

方案二的优势

方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不
同的线程处理它们。比起方案 1,方案 2 的最大优势就在于它的高伸缩性,就是说我们可
以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影
响。如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度
慢,那么增加 Worker 线程池线程数即可。

方案二的不足

  1. 它的实现难度要比方案 1 大得多,毕竟它有两组线程,你需要分别管理它们。
  2. 因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消
    息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在
    消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在
    后,但是,后面的 Worker 线程却有可能先处理消息 2,再处理消息 1,这就破坏了消
    息在分区中的顺序。还是那句话,如果你在意 Kafka 中消息的先后顺序,方案 2 的这个
    劣势是致命的。
  3. 方案 2 引入了多组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得
    异常困难,结果就是可能会出现消息的重复消费。如果你在意这一点,那么我不推荐你
    使用方案 2。

方案一主体实现

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));// 执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}...
}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构。

方案二主体实现

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}

当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。

如有错误欢迎指正

多线程开发Kafka消费者的方案和优劣相关推荐

  1. kafka消费者(Consumer)端多线程消费的实现方案

    kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...

  2. 第十章 进程间的通信 之 Java/Android多线程开发(二)

    文章目录 (一)Java 多线程开发 1.1)线程状态 1.2)线程控制方法 (1.2.1)Synchronized (1.2.2)Volatile (1.2.3)ReentrantLock 1.3) ...

  3. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

  4. Linux下实现多线程的生产者消费者问题

    Linux下实现多线程的生产者消费者问题 一.原理的理解 生产者-消费者问题是一个经典的线程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个线程地址空间内执行的两个线程 ...

  5. Python 多进程开发与多线程开发

    我们先来了解什么是进程? 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程.程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本:进程 ...

  6. iOS 多线程的四种技术方案

    iOS 多线程的四种技术方案 image pthread 实现多线程操作 代码实现: void * run(void *param) {for (NSInteger i = 0; i < 100 ...

  7. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  8. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  9. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

最新文章

  1. 直接内存访问(DMA)
  2. 正则表达式基础知识及应用(用于个人学习以及回顾)
  3. Linux mount: Structure needs cleaning 错误解决方法
  4. 给element的select添加复选框
  5. java guava map_使用googled的guava常见ImmutableMap、Multimap、Sets、Optional、Objects及Preconditions检查等用法...
  6. 2014大学计算机考试,2014大学计算机基础考试围参考答案.doc
  7. 我的世界之 paper和spigot 的区别 Java服务端开服
  8. 如何修复macOS上的Microsoft Edge高CPU和内存使用情况?
  9. HTML5生日祝福网页制作 (粉色系列为你定制) HTML+CSS+JavaScript
  10. linux下oracle登陆建表,Oracle建表过程初学
  11. 最新fl studio20.8中文电音编曲宿主制作软件
  12. 平面设计师经常去哪些网站?
  13. java使用itextpdf生成 pdf (支持table)分页
  14. c语言写流水灯程序,用汇编和C语言 写流水灯程序
  15. Cirium称,2020年航空公司准点率表现良好,但客运航班数只有2019年的一半
  16. 使用python做协整模型分析并进行残差检验
  17. 物联网控制APP入门专题(五)---使用android studio直接编写物联网控制APP
  18. 什么是服务熔断,什么是服务降级?
  19. QT5.11-spdlog使用教程
  20. 【全网力荐】堪称最易学的Python基础入门教程

热门文章

  1. NUIST OJ 1352 回顾 【差分】
  2. windows7与linux共享文件夹oracle,ORACLE expdp备份到windows网络共享文件目录(NFS)
  3. shell条件测试操作
  4. 深圳大学计算机图形学实验一——OpenGL绘制布布头像
  5. 计算机旁边禁止放水英语怎么说,用方法学:“打假球”“放水”英语怎么说?...
  6. 【daisy-framework】MySQL 开发规范
  7. 下一代网络:大道至“简”
  8. 动态修改 xm-select 颜色
  9. 【散文】一场白露,怎知秋心
  10. 笔记本电脑直接开热点(在连接了wifi的情况下)