?消费者示例

// 配置请求参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// consumer subscribe topics , 订阅支持正则表达式 例如Topic: my-topic.*(只有发生rebalance,新增的Topic才会被消费.)
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(300));for (ConsumerRecord<String, String> record : records) {log.info("topic = {}, partition = {}, offset = {}, " + "key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());}}
} finally {// 最终关闭消费者.kafkaConsumer.close();
}
复制代码

Poll方法概述

Consumer的Poll方法,必须保持一直调用(没有数据也调用). 因为Poll方法使Consumer保持活性.

新消费者第一次调用Poll方法,负责发现GroupCoordinator. 通过GroupCoordinator加入消费者组,收到分配给它的分区.

Close方法概述

Close方法会释放网络资源,并且立即触发rebalance操作. 而不是等Kafka Cluster被动发现.

Offset提交方式

消费者消费数据之后,需要告诉kafka,它消费到了什么地方(offset).

Offset自动提交

Kafka存在自动提交偏移量机制props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);,默认情况下,5秒钟会触发. 但是可以通过 auto.commit.interval.ms进行配置,自动提交机制依赖于poll方法(保持心跳).因为每次调用poll方法会检查是否应该提交偏移量. 相同原理close方法也会提交偏移量.

Offset手动提交

手动提交需要把auto.commit.offset=false,这样consumer则不会进行自动提交. commit the last offset in the batch.

commitSync

同步提交的所有方法,按照功能分为两类,一类是指定特定分区,另一类是全部分区.同步提交会进行失败重试.

?同步提交示例

⚠️ 同步手动提高会降低吞吐量.

commitAsync

异步提交没有失败重试机制. 假如先提交2000 offset(失败), 然后提交3000 offset(成功),如果有重试机制,则会发生最后2000提交成功.

?异步提交示例

结合commitAsync和commitSync

指定Offset消费

指定Offset消费的时候,也需要指定Partition,因为Offset具体到Partition才有意义.

seekToBeginning方法是跳到指定Partition的Offset为0处,也就是从头开始消费.seekToEnd方法是跳到指定Partition的Offset的末尾处,当时数据的末端.而最后一个seek(TopicPartition,long)跳到指定Partition的指定Offset处消费.

⚠️ 在调用seek方法的时候,需要先获得分区的信息,而分区的信息要通过poll方法来获得. 如果调用seek方法时,没有分区信息,则会抛出IllegalStateException异常 No current assignment for partition xxxx.

? 下图是使用seek方法示例

红框里是关键步骤, kafkaConsumer.poll(Duration.ofMillis(50000))是为了加入消费者组,获得Topic相关的信息.kafkaConsumer.assignment()获得分配给的分区.根据分配的分区进行偏移量重置. 如果重置的分区不在列表里面,则会抛出上面说的异常.

Consumer Exit

消费者的poll方法一般都是被while(true)语句包含的,那么退出就是一个问题.

  1. 在另外一个线程使用consumer.wakeup(); 会抛出WakeupException异常,该异常不需要去捕获,它会自己发送LeaveGroup request.但是在之后需要手动关闭consumer.
  2. 总的来说也是方法一的变型. 利用Runtime.getRuntime().addShutDownHook(Thread);来调用consumer.wakeup();, ShutDownHook运行在另外一个线程中. 当按下ctrl+c的时候,触发ShutDownHook中传入的线程,最后运行完结束主程序.

? 消费者安全退出示例

指定分区消费

指定一个主题的某些分区消费,不会触发rebalance.

假如下面例子

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[Partition(topic = TwentyMillion, partition = 2, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = TwentyMillion, partition = 4, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])]
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
复制代码

日志表明分配到两个分区(共有5个分区), 最后clientId=consumer-1,groupId=TwentyConsumerManual.

现在有个问题,假如再启动一个消费者,而该消费者是用subscribe订阅,会产生什么现象?

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : eec43959745f444f
[main] INFO org.apache.kafka.clients.Metadata - Cluster ID: vfySGyF5TV6oAkf7kbh7hA
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Discovered group coordinator 192.168.0.251:9092 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Revoking previously assigned partitions []
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Successfully joined group with generation 3
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=TwentyConsumerManual] Setting newly assigned partitions [TwentyMillion-0, TwentyMillion-4, TwentyMillion-3, TwentyMillion-2, TwentyMillion-1]
复制代码

日志表明启动的消费者和手动指定分区启动的消费者是同一个,说明两者并不能协同工作.

转载于:https://juejin.im/post/5cda5b64f265da0379419a48

Consumer - 消费者相关推荐

  1. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  2. 4.2.10 Kafka源码剖析, 阅读环境搭建, broker启动流程, topic创建流程, Producer生产者流程, Consumer消费者流程,

    目录 4.1 Kafka源码剖析之源码阅读环境搭建 4.1.1 安装配置Gradle 4.1.2 Scala的安装和配置 4.1.3 Idea配置 4.1.4 源码操作 4.2 Kafka源码剖析之B ...

  3. kafka 主动消费_Kafka核心API——Consumer消费者

    Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...

  4. python-kafka多线程快速读取consumer消费者数据,同时使用批读取与无限流读取改进

    python单线程循环读取consumer会很浪费时间,而且速率远远低于生产者可容纳的速率,因此我们使用多线程来处理IO密集型的读取操作 文章目录 极简的示例 1. 生产者(先运行) 2. 消费者部分 ...

  5. kafka consumer消费者 offset groupID详解

    kafka consumer:消费者可以从多个broker中读取数据.消费者可以消费多个topic中的数据. 因为Kafka的broker是无状态的,所以consumer必须使用partition o ...

  6. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  7. kafka之Consumer消费者基本概念

    概念 消费者 消费者(Consumer)即读取Kafka集群中某些topic消息的程序,kafka中消费者分为两种类型: 消费者组 (consumer group) 独立消费者 (standalong ...

  8. kafka的Consumer 消费者(六)

    Kafka 的消费方式   消息队列主要有两个消费方式:pull(拉)模式 和 push(推)模式   pull(拉)模式 consumer采用从broker中主动拉取数据,push(推)模式 bro ...

  9. springclound consumer(消费者)

    1.在pom文件添加相关依赖 <dependencies><!-- web --><dependency><groupId>org.springfram ...

最新文章

  1. php dedecms 记录访问者ip,dedecms实现显示访问者ip地址的方法
  2. Java学习笔记(十)--控制台输入输出
  3. Advanced Bash-Scripting Guide 学习笔记一
  4. Java EE状态会话Bean(EJB)示例
  5. input type=checkbox 前面有个可供打钩的小方框 HTML DOM Checkbox 对象
  6. 指针作为函数参数 进行内存释放 并置NULL
  7. ASP.NET MVC的JavaScriptResult
  8. 计算机领域国际会议分类及排名
  9. 想要升级Big Sur?你的Mac与Big Sur兼容吗?
  10. 【JavaWeb学习】14综合案例
  11. Tuxera NTFS如何解决硬盘无法写入文件教程分享
  12. 总结:6个不得不看的APP获客模式
  13. 数据标注是什么,如何高效完成数据标注?
  14. 4 年 Java 经验面试总结、心得体会(太全面了!!都会就无敌了)
  15. 10个不大为人知的网站
  16. 美颜sdk对直播平台有多重要?为什么需要接入直播美颜sdk?
  17. Xcode Server 教程1:入门
  18. java打字游戏课程设计_java课程设计打字游戏
  19. Ubuntu虚拟机ns_error_failure 0x80004005错误
  20. 数据产品经理日常工作

热门文章

  1. 读《数学辞海》编辑委员会之《数学辞海 第三卷》
  2. tomcat服务器访问webapps资源404
  3. 游戏背包简易版(2)
  4. 频率信号转电压或电流信号隔离变送器0-1KHz /0-5KHz /0-10KHz转0-2.5V/0-5V/0-20mA
  5. 局域网常见的十个故障分析和排除方法
  6. 问题解决:Invalid bound statement (not found): com.jxgm.csz.mapper.UserMapper.add
  7. 2018,还剩下一个月了。
  8. 莱卡公司宣布25种莱卡纤维产品获得材料健康金级认证
  9. 拉马努金的公式是怎么想出来的
  10. 每个软件开发者必须绝对至少需要了解的Unicode和Character Sets的知识(没有借口!)