Kafka多线程消费理解

Kafka Java Consumer设计

Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。
用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。

官网文档对于consumer多线程的处理方式 :

  1. 每个线程一个消费者

    每个线程自己的消费者实例。这里是这种方法的优点和缺点:

    • PRO: 这是最容易实现的
    • PRO: 因为它不需要在线程之间协调,所以通常它是最快的。
    • CON: 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。
    • CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。
    • CON: 所有进程中的线程总数受到分区总数的限制。
  2. 解耦消费和处理

    另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项同样有利弊:

    • 可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
    • CON: 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。
    • CON: 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。

这是两种不同的处理方式。

Kafka多线程消费实例

1. 消费者程序拥有多个消费者并拥有自己的主题

解释: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。 (其实就是一个消费者客户端开启多个线程,每个线程都有各自的Consumer对同一个topic或者多个topic进行消费,这些消费者(线程)组成了一个消费者组)

借用网上的图:

topic数据实例:

代码:

public class KafkaConsumerThread  implements Runnable{private KafkaConsumer<String,String> consumer;private AtomicBoolean closed = new AtomicBoolean(false);public KafkaConsumerThread(){}// 构造方法 生成自己的consumerpublic KafkaConsumerThread(Properties props) {this.consumer = new KafkaConsumer<>(props);}@Overridepublic void run() {try {// 消费同一主题consumer.subscribe(Collections.singletonList("six-topic"));// 线程名称String threadName = Thread.currentThread().getName();while (!closed.get()){ConsumerRecords<String, String> records = consumer.poll(3000);for (ConsumerRecord<String, String> record : records) {System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());}}}catch (WakeupException e){e.printStackTrace();}finally {consumer.close();}}/*** 关闭消费*/public void shutdown(){closed.set(true);// wakeup 可以安全地从外部线程来中断活动操作consumer.wakeup();}public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "XXXXXXX:9093");props.put("group.id", "thread-1");//消费者组,只要group.id相同,就属于同一个消费者组props.put("enable.auto.commit", "true");//自动提交offsetprops.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("session.timeout.ms", "30000");props.put("max.poll.records",6);// 运行三个线程,消费同一个topic 这个topic的分区必须大于等于3 否则会有消费者消费不到数据for (int i = 0; i < 3 ; i++) {new Thread(new KafkaConsumerThread(props),"Thread"+i).start();}}
}

日志:

Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one
Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one
Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one

可以看到三个线程,一个消费者组,每个线程消费者得到一个topic的分区去消费消息。

2. 单个消费者,多个线程处理消息

解释: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个或多个,每个维护专属KafkaConsumer实例,处理消息交由特定线程池来做,从而实现消息获取与消息处理的真正解耦。

这里的多线程处理消息逻辑可以有多种方法,这里就列出来几种:

使用队列存储消息,多线程处理队列:

  1. 使用独立锁(FIFO)队列LinkedBlockingQueue

    该队列是线程安全的先进先出队列

    public class KafkaConsumerThread2 implements Runnable {// 存储消息 先进先出队列private LinkedBlockingQueue<ConsumerRecords<String,String>> list;private AtomicBoolean closed = new AtomicBoolean(false);public KafkaConsumerThread2() {}public KafkaConsumerThread2(LinkedBlockingQueue<ConsumerRecords<String, String>> list) {this.list = list;}@Overridepublic void run() {// 线程名称String threadName = Thread.currentThread().getName();// 处理消息while (!closed.get()){try {ConsumerRecords<String, String> records = list.take();System.out.println("消息数量"+records.count());if (records.isEmpty()){System.out.printf("队列为空,不消费数据,Thread-name= %s\n",threadName);}else {for (ConsumerRecord<String, String> record : records) {Thread.sleep(3000);System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());}}}catch (InterruptedException e){e.printStackTrace();}}}public static void main(String[] args) {LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();Properties props = new Properties();props.put("bootstrap.servers", "10.33.68.68:9093");props.put("group.id", "thread-5");//消费者组,只要group.id相同,就属于同一个消费者组props.put("enable.auto.commit", "true");//自动提交offsetprops.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("session.timeout.ms", "30000");props.put("max.poll.records",5);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费同一主题consumer.subscribe(Collections.singletonList("six-topic"));// 开启三个线程处理队列中的消息for (int i = 0; i <3 ; i++) {new Thread(new KafkaConsumerThread2(list),"thread-"+i).start();}while (true){ConsumerRecords<String, String> records = consumer.poll(1000);try {list.put(records);//Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}
    }
    
  2. 创建线程池,使用线程池处理消息逻辑

    逻辑处理类ConsumerDealThread:

    public class ConsumerDealThread implements Runnable{private ConsumerRecord record;public ConsumerDealThread(ConsumerRecord record) {this.record = record;}public void run() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value());}
    }
    

    运行类KafkaConsumerThread3:

    public class KafkaConsumerThread3 {public static void main(String[] args) {LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();Properties props = new Properties();props.put("bootstrap.servers", "10.33.68.68:9093");props.put("group.id", "thread-18");//消费者组,只要group.id相同,就属于同一个消费者组props.put("enable.auto.commit", "true");//自动提交offsetprops.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("session.timeout.ms", "30000");props.put("max.poll.records",5);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费同一主题consumer.subscribe(Collections.singletonList("six-topic"));ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());while (true){ConsumerRecords<String, String> records = consumer.poll(1000);try {for (ConsumerRecord<String, String> record : records) {executor.submit(new ConsumerDealThread(record));}} catch (Exception e) {e.printStackTrace();consumer.wakeup();executor.shutdown();try {if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {System.out.println("超时,未关闭线程池");}} catch (InterruptedException e2) {e.printStackTrace();}}BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue();System.out.println("队列数量:"+queue.size());}}
    }
    

【Kafka笔记】5.Kafka 多线程消费消息相关推荐

  1. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  2. Kafka笔记:kafka原理简介以及架构

    文章目录 1.1概述 1.2消息系统介绍 1.2.1点对点消息传递 1.2.2发布-订阅消息传递 1.3 Kafka的优点 1.4 Kafka架构以及术语解释 1.4.1 Broker 1.4.2 T ...

  3. kafka设置起止时间消费消息

    消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下: public class KafkaConsumerByTime {public static void main(Str ...

  4. 利用Kafka发送/消费消息-Java示例

    利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...

  5. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  6. Kafka 入门 and kafka+logstash 实战应用

    原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://tchuairen.blog.51cto.com/3848118/1855090 ...

  7. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  8. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  9. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

最新文章

  1. nfs原理及安装配置
  2. undefind_undefined什么意思
  3. CURL HTTPS POST
  4. log4net 日志跟踪
  5. tl r402路由器设置_tp-link TL-WR700N 迷你型路由器设置记录
  6. python字符串筛选输出_「每日一练」巧用Python对字符串进行筛选
  7. 检测手机用户安装的应用程序是否有使用某权限
  8. 课设2--线性表的操作
  9. 卫星轨道的估计问题(Matlab)(二):扩展卡尔曼滤波(EKF)对新问题的尝试
  10. java中list,set,map集合的区别,及面试要点
  11. matlab对比r语言,R语言与matlab循环时间对比
  12. React Native填坑之旅--Button篇
  13. droidcam调用手机摄像头的方法(提供PC+Android软件,不需要积分)
  14. R语言将 手写数据集 图片数据转为表格数据并主成分分析
  15. 液晶显示屏的LED背光辉度公式计算?
  16. 18位身份证号码校验
  17. mysql workbench pk_MySQL Workbench筑表时PK, NN, UQ, BIN, UN, ZF, AL的意思
  18. gba口袋妖怪c语言源代码,查看“精灵宝可梦 火红·叶绿”的源代码
  19. 实验三 配置和实施VLAN
  20. 视频文件打不开怎么修复

热门文章

  1. struct timeval
  2. VVC学习之五:帧内预测——67个模式预测信号生成 predIntraAng()
  3. 数字图像处理 离散图象变换
  4. 原来无线路由器也可以做无线交换机
  5. 戴尔 R730 服务器系统安装
  6. 【随堂笔记】数据结构基础
  7. 阿里云跨境游戏及电商网络加速方案(全球加速和CDN)
  8. 满满的大片既视感,还原 Pwn2Own 黑客大赛首日战况
  9. uniapp Modal自定义弹窗
  10. Android模拟器下重力感应应用的开发-Simulator的使用