参考:Kafka参数

一、@KafkaListener注解

@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)public void listen(String msgData) {LOGGER.info("收到消息" + msgData);
}  @KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",topics = Constants.TOPIC)public void listen2(String msgData) {LOGGER.info("收到消息" + msgData);
}@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)public void listen3(String msgData) {LOGGER.info("收到消息" + msgData);
}@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)public void listen4(String msgData) {LOGGER.info("收到消息" + msgData);
}

(1) id: 默认是每个Listener实例的重要标识。

对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

(2) goupId: 每个消费者所属的组。

每个消费者都有自己所属的组。一个组中可以有多个消费者。

一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

(3) clientIdPrefix: 消费者clientId前缀

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix", topics = Constants.TOPIC)
public void listen2(String msgData) {LOGGER.info("收到消息" + msgData);
}

如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

(4) autoStartup

public @interface KafkaListener .../*** Set to true or false, to override the default setting in the container factory. May* be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or* a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to* obtain the value.* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.* @return true to auto start, false to not auto start.* @since 2.2*/String autoStartup() default "";

是否自动启动,如果是 false,默认不生效,需要手动唤醒。

看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。

另外可以使用 ${} 占位符的形式,支持配置。

application.yaml:
listener:auto:startup: true  java :@KafkaListener(... containerFactory = "batchContainerFactory",autoStartup = "${listener.auto.startup}")public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...

注:每个消费者实例对象内部持有两个属性。
boolean running
boolean paused
有几个改变状态的方法:
调用start()方法后,running转为true
调用stop()方法后,running转为false
调用pause()方法后,paused转为true
调用resume()方法后,paused转为false

只有running=true 、 paused=false 的消费者实例才能正常消费数据。
注解上的autoStartup改变的是running属性。

    @KafkaListener(id = "11111", groupId = "demo-group", topics = Constants.TOPIC, autoStartup = "false")public void listen(String msgData) throws InterruptedException {LOGGER.info("收到消息" + msgData);Thread.sleep(1000);}

二、Kafka Listener任务暂停及恢复

2.1 唤醒消费者实例, 示例代码:

    @Autowiredprivate KafkaListenerEndpointRegistry registry;// 获取到id="11111" 的消费实例对象MessageListenerContainer listenerContainer = this.registry.getListenerContainer("11111");listenerContainer.pause();  //paused ==> true// listenerContainer.stop(); //running==> false

2.2 暂停消费者实例, 示例代码:

    @Autowiredprivate KafkaListenerEndpointRegistry registry;// 获取到id="11111" 的消费实例对象MessageListenerContainer listenerContainer = this.registry.getListenerContainer("11111");listenerContainer.pause();  //paused ==> true// listenerContainer.stop(); //running==> false

2.3 定时任务自动启动

    @Autowiredprivate KafkaListenerEndpointRegistry registry;// 定时器,每天凌晨0点开启监听@Scheduled(cron = "0 0 0 * * ?")public void startListener() {log.info("开启监听");// 判断监听容器是否启动,未启动则将其启动if (!registry.getListenerContainer("11111").isRunning()) {registry.getListenerContainer("11111").start();}registry.getListenerContainer("11111").resume();}// 定时器,每天早上10点关闭监听@Scheduled(cron = "0 0 10 * * ?")public void shutDownListener() {log.info("关闭监听");registry.getListenerContainer("11111").pause();}

三、@KafkaListener注解方法参数汇总

@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,主要依据kafka的具体配置。

    @KafkaListener(....)public void listen1(String data) @KafkaListener(....)public void listen2(ConsumerRecord<K,V> data) @KafkaListener(....)public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) @KafkaListener(....)public void listen4(ConsumerRecord<K,V> data,Acknowledgment acknowledgment, Consumer<K,V> consumer)@KafkaListener(....)public void listen5(List<String> data) @KafkaListener(....)public void listen6(List<ConsumerRecord<K,V>> data) @KafkaListener(....)public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) @KafkaListener(....)public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)

四、KafkaListenerContainerFactory配置

在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。

4.1 kafka参数默认封装对象

所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。

    @Autowiredprivate KafkaProperties properties;

4.2 @KakfkaListener注解标记监听实例对象

如不特殊指定,默认使用在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。

监听器实例对象自动绑定到上述配置文件,是由于它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。

源码注释如下,如果不特殊指定,则默认的容器工厂将会被使用。

package org.springframework.kafka.annotation;public @interface KafkaListener .../*** The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}* to use to create the message listener container responsible to serve this endpoint.* <p>If not specified, the default container factory is used, if any.* @return the container factory bean name.*/String containerFactory() default "";

默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。

package org.springframework.boot.autoconfigure.kafka;class KafkaAnnotationDrivenConfiguration {@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))? this.batchMessageConverter : this.messageConverter;configurer.setMessageConverter(messageConverterToUse);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setErrorHandler(this.errorHandler);configurer.setBatchErrorHandler(this.batchErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));return factory;}

4.3 自定义容器工厂实例代码示例:

    @Autowiredprivate KafkaProperties properties;@Bean("batchContainerFactory")public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {ConcurrentKafkaListenerContainerFactory<?, ?> container =new ConcurrentKafkaListenerContainerFactory<>();Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();stringObjectMap.put("enable.auto.commit", false);container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));// 没有topic是否禁止系统启动container.setMissingTopicsFatal(true);// 并发container.setConcurrency(1);// 批量接收container.setBatchListener(true);// 如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。container.getContainerProperties().setPollTimeout(5000);// 设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);// 设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用// factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));return container;}@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {LOGGER.info("4444收到消息" + list.size());acknowledgment.acknowledge();}

五、吞吐量

如下,这里我只列出了影响本例的几条参数。

spring:kafka:consumer:enable-auto-commit: true# max-poll-records: 20listener:ack-mode: batchtype: batchconcurrency: 5

如果设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。

    @KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")public void listen4(List<String> msgData) {LOGGER.info("收到消息" + msgData);}@KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")public void listen5(List<String> msgData) {LOGGER.info("收到消息" + msgData);}@Beanpublic NewTopic newTopic() {return new NewTopic(Constants.TOPIC, 8, (short) 1);}

系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!

[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]

六、 结论:

  1. concurrency值对应@KafkaListener的消费者实例线程数目,如果concurrency数量大于partition数量,多出的部分分配不到partition,会被闲置。

  1. 设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。

@KafkaListener 详解及消息消费启停控制相关推荐

  1. java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解

    本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...

  2. java canvas 缩放图片_详解如何用HTML5 Canvas API控制图片的缩放变换

    摘要:这篇HTML5栏目下的"详解如何用HTML5 Canvas API控制图片的缩放变换",介绍的技术点是"html5_canvas.canvas.Html5.控制图片 ...

  3. 增压撬启停控制优化及纳入GE UCP控制系统可行性研究

    1.整合增压撬进GE系统的必要性 增压撬是压缩机干气密封系统重要组成部分,目前增压撬控制算法存在一定不足,其通过压缩机进出口汇管差压值决定增压泵启停,而没将干气密封密封性能核心指标PDIT153差压值 ...

  4. 台达DVP ES系列plc与3台台达MS300变频器通讯程序 实现频率设定,启停控制,实际频率读取等

    台达DVP ES系列plc与3台台达MS300变频器通讯程序 器件:台达DVP ES系列的PLC,3台台达MS300系列变频器,昆仑通态 功能:实现频率设定,启停控制,实际频率读取等. 资料:带注释P ...

  5. 信捷PLC与7台三菱变频器485通讯案例 对7台变频器进行单独频率设定,启停控制,频率读取

    信捷PLC与7台三菱变频器485通讯案例功能:用信捷PLC与7台三菱变频器modbus通讯,自由通讯协议 配件:信捷XC系列PLC,三菱E740变频器,昆仑通态触摸屏 功能:对7台变频器进行单独频率设 ...

  6. 三菱FX1N PLC 485与三菱变频器modbus通讯 对变频器进行频率设定,加减速时间设置,正反转,启停控制

    三菱FX1N PLC 485与三菱变频器modbus通讯可直接拿来实用了,三菱FX PLC与三菱变频器通讯 采用器件:三菱FX1N PLC,FX1N485BD板,1台三菱E740变频器,三菱FX2N ...

  7. 两台异步电动机(星角降压启动+自耦降压启动)的顺序启/停控制——电气控制

    两台异步电动机的顺序启/停控制--电气控制 题目要求: 实训器材: 综合解析: 星三角降压启动讲解: 自耦变压器降压启动讲解: 电路设计: 过程分析: 实操接线: 注意事项: 一周的实训两天做完!总共 ...

  8. 三菱fx3u+485ADP-MB与3台台达变频器modbus通讯程序 通过三菱fx3u 485ADP-MB板对3台台达变频器进行modbus通讯,实现频率设定,启停控制,输出频率读取,输出电压读取

    三菱fx3u+485ADP-MB与3台台达变频器modbus通讯程序 功能:通过三菱fx3u 485ADP-MB板对3台台达变频器进行modbus通讯,实现频率设定,启停控制,输出频率读取,输出电压读 ...

  9. 1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等

    1.12.Flink Kafka-Connector详解 1.12.1.Kafka Consumer消费策略设置 1.12.2.Kafka Consumer的容错 1.12.3.动态加载Topic 1 ...

最新文章

  1. 这12本经典技术书,是时候C位出道了!
  2. 跟着“路线图”,我们一起遨游机器学习的世界!
  3. 抽象:程序员必备的能力
  4. 使用OpenJDK 11运行JAXB xjc编译器
  5. 人生感悟:人生像吃自助餐
  6. c#养老院老人信息管理系统源码 论文_包河区【失智老人养老院】
  7. 大龄程序员想转产品经理?3本书给你最靠谱的进阶攻略
  8. 音速索尼克 怪人_如何使用AI玩刺猬索尼克。 真干净!
  9. OpenCV图像、矩阵、数组介绍
  10. xss攻击突破转义_每个人都应该了解的7种xss漏洞
  11. 求解简单的四则运算表达式
  12. [Wireshark]交换机设置镜像端口并使用Wireshark抓包异常流量分析病毒种类
  13. 鸿蒙升级之前APP没有,升级鸿蒙2.0之后,没有原生的日历app了
  14. python中减号怎么打_python减号
  15. Base64的编码与解码的实现方法(超详细,每一行代码都含注释)
  16. 基于element-ui 搭建管理后台
  17. 教你如何建立STM32F103通用工程模板(固件库)
  18. python中的reshape是什么意思_python中reshape的用法(深度学习入门程序)
  19. 张小平 (为奥运冠军名字作诗)
  20. CnOpenData中国高校专利授权数量统计

热门文章

  1. 2020转录组RNA-SEQ上游分析
  2. 前端入门(二)npm包管理+模块化+bable转码器+webpack打包+vue-element-admin
  3. Revit问题:创建牛腿柱和快速生成圈梁
  4. thingsboard2.5 CE版本数据库设计说明
  5. win11系统新版edge不兼容网银如何解决【解决办法】
  6. 在word文档中漂亮地贴代码
  7. ORACLE 序列重置
  8. Typora:改变字体的背景颜色
  9. 联想服务器启动很久才能进系统,Win10开机一直请稍后很久才进系统的两种解决方法...
  10. CGA建模教程——形状语法(建筑群)