Consumer - 消费者
?消费者示例
// 配置请求参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// consumer subscribe topics , 订阅支持正则表达式 例如Topic: my-topic.*(只有发生rebalance,新增的Topic才会被消费.)
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(300));for (ConsumerRecord<String, String> record : records) {log.info("topic = {}, partition = {}, offset = {}, " + "key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}
} finally {// 最终关闭消费者.kafkaConsumer.close();
}
复制代码
Poll方法概述
Consumer的Poll方法,必须保持一直调用(没有数据也调用). 因为Poll方法使Consumer保持活性.
新消费者第一次调用Poll方法,负责发现GroupCoordinator. 通过GroupCoordinator加入消费者组,收到分配给它的分区.
Close方法概述
Close方法会释放网络资源,并且立即触发rebalance操作. 而不是等Kafka Cluster被动发现.
Offset提交方式
消费者消费数据之后,需要告诉kafka,它消费到了什么地方(offset).
Offset自动提交
Kafka存在自动提交偏移量机制props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
,默认情况下,5秒钟会触发. 但是可以通过 auto.commit.interval.ms
进行配置,自动提交机制依赖于poll方法(保持心跳).因为每次调用poll方法会检查是否应该提交偏移量. 相同原理close方法也会提交偏移量.
Offset手动提交
手动提交需要把auto.commit.offset=false
,这样consumer则不会进行自动提交. commit the last offset in the batch.
commitSync
同步提交的所有方法,按照功能分为两类,一类是指定特定分区,另一类是全部分区.同步提交会进行失败重试.
?同步提交示例
⚠️ 同步手动提高会降低吞吐量.
commitAsync
异步提交没有失败重试机制. 假如先提交2000 offset(失败), 然后提交3000 offset(成功),如果有重试机制,则会发生最后2000提交成功.
?异步提交示例
结合commitAsync和commitSync
指定Offset消费
指定Offset消费的时候,也需要指定Partition,因为Offset具体到Partition才有意义.
seekToBeginning方法
是跳到指定Partition的Offset为0处,也就是从头开始消费.seekToEnd方法
是跳到指定Partition的Offset的末尾处,当时数据的末端.而最后一个seek(TopicPartition,long)
跳到指定Partition的指定Offset处消费.
⚠️ 在调用seek方法的时候,需要先获得分区的信息,而分区的信息要通过poll方法来获得. 如果调用seek方法时,没有分区信息,则会抛出IllegalStateException异常 No current assignment for partition xxxx.
? 下图是使用seek方法示例
红框里是关键步骤, kafkaConsumer.poll(Duration.ofMillis(50000))
是为了加入消费者组,获得Topic相关的信息.kafkaConsumer.assignment()
获得分配给的分区.根据分配的分区进行偏移量重置. 如果重置的分区不在列表里面,则会抛出上面说的异常.
Consumer Exit
消费者的poll方法一般都是被while(true)
语句包含的,那么退出就是一个问题.
- 在另外一个线程使用
consumer.wakeup();
会抛出WakeupException异常,该异常不需要去捕获,它会自己发送LeaveGroup request.但是在之后需要手动关闭consumer. - 总的来说也是方法一的变型. 利用
Runtime.getRuntime().addShutDownHook(Thread);
来调用consumer.wakeup();
, ShutDownHook运行在另外一个线程中. 当按下ctrl+c的时候,触发ShutDownHook中传入的线程,最后运行完结束主程序.
? 消费者安全退出示例
指定分区消费
指定一个主题的某些分区消费,不会触发rebalance.
假如下面例子
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[Partition(topic = TwentyMillion, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = TwentyMillion, partition = 4, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
复制代码
日志表明分配到两个分区(共有5个分区), 最后clientId=consumer-1,groupId=TwentyConsumerManual.
现在有个问题,假如再启动一个消费者,而该消费者是用subscribe订阅,会产生什么现象?
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Successfully joined group with generation 3
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Setting newly assigned partitions [TwentyMillion-0, TwentyMillion-4, TwentyMillion-3, TwentyMillion-2, TwentyMillion-1]
复制代码
日志表明启动的消费者和手动指定分区启动的消费者是同一个,说明两者并不能协同工作.
转载于:https://juejin.im/post/5cda5b64f265da0379419a48
Consumer - 消费者相关推荐
- 关于Kafka 的 consumer 消费者手动提交详解
前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...
- 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,
目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...
- kafka 主动消费_Kafka核心API——Consumer消费者
Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...
- python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进
python单线程循环读取consumer会很浪费时间,而且速率远远低于生产者可容纳的速率,因此我们使用多线程来处理IO密集型的读取操作 文章目录 极简的示例 1. 生产者(先运行) 2. 消费者部分 ...
- kafka consumer消费者 offset groupID详解
kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...
- Apache Kafka Consumer 消费者集
1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...
- kafka之Consumer消费者基本概念
概念 消费者 消费者(Consumer)即读取Kafka集群中某些topic消息的程序,kafka中消费者分为两种类型: 消费者组 (consumer group) 独立消费者 (standalong ...
- kafka的Consumer 消费者(六)
Kafka 的消费方式 消息队列主要有两个消费方式:pull(拉)模式 和 push(推)模式 pull(拉)模式 consumer采用从broker中主动拉取数据,push(推)模式 bro ...
- springclound consumer(消费者)
1.在pom文件添加相关依赖 <dependencies><!-- web --><dependency><groupId>org.springfram ...
最新文章
- php dedecms 记录访问者ip,dedecms实现显示访问者ip地址的方法
- Java学习笔记(十)--控制台输入输出
- Advanced Bash-Scripting Guide 学习笔记一
- Java EE状态会话Bean(EJB)示例
- input type=checkbox 前面有个可供打钩的小方框 HTML DOM Checkbox 对象
- 指针作为函数参数 进行内存释放 并置NULL
- ASP.NET MVC的JavaScriptResult
- 计算机领域国际会议分类及排名
- 想要升级Big Sur?你的Mac与Big Sur兼容吗?
- 【JavaWeb学习】14综合案例
- Tuxera NTFS如何解决硬盘无法写入文件教程分享
- 总结:6个不得不看的APP获客模式
- 数据标注是什么,如何高效完成数据标注?
- 4 年 Java 经验面试总结、心得体会(太全面了!!都会就无敌了)
- 10个不大为人知的网站
- 美颜sdk对直播平台有多重要?为什么需要接入直播美颜sdk?
- Xcode Server 教程1:入门
- java打字游戏课程设计_java课程设计打字游戏
- Ubuntu虚拟机ns_error_failure 0x80004005错误
- 数据产品经理日常工作
热门文章
- 读《数学辞海》编辑委员会之《数学辞海 第三卷》
- tomcat服务器访问webapps资源404
- 游戏背包简易版(2)
- 频率信号转电压或电流信号隔离变送器0-1KHz /0-5KHz /0-10KHz转0-2.5V/0-5V/0-20mA
- 局域网常见的十个故障分析和排除方法
- 问题解决:Invalid bound statement (not found): com.jxgm.csz.mapper.UserMapper.add
- 2018,还剩下一个月了。
- 莱卡公司宣布25种莱卡纤维产品获得材料健康金级认证
- 拉马努金的公式是怎么想出来的
- 每个软件开发者必须绝对至少需要了解的Unicode和Character Sets的知识(没有借口!)