【README】

1,本文主要关注 KafkaTemplate的重点方法,并非全部方法;

2,KafkaTemplate  底层依赖于 DefaultKafkaProducerFactory , 关于 DefaultKafkaProducerFactory 的介绍,refer2

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客【1】 类描述类描述:单例共享 Producer 实例的 ProducerFactory 实现。此实现将为每次 createProducer() 调用时提供的 Map 配置和可选的 Serializer 实现返回相同的 Producer 实例(如果未启用事务)。如果您使用的序列化器没有参数构造函数并且不需要设置,那么最简单的方法是在传递给 DefaultKafkaProducerFactory 构造函数的配置中针对 ProducerConfig.KEY_SERIALIZER_CLASS_Chttps://blog.csdn.net/PacosonSWJTU/article/details/121306370


【1】KafkaTemplate 类说明

用于执行高级操作的模板。 当与 DefaultKafkaProducerFactory 一起使用时,模板是线程安全的。 生产者工厂和 org.apache.kafka.clients.producer.KafkaProducer 确保这一点;

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,ApplicationListener<ContextStoppedEvent>, DisposableBean {

【1.1】构造方法

使用提供的生产者工厂和 autoFlush 设置创建一个实例。

如果您已将生产者的 linger.ms 配置为非默认值并希望立即在此模板上发送操作,无论该设置如何, 又或者您希望阻塞直到服务器根据acs属性确认已收到消息, 需要把autoFlush设置为true。

如果 configOverrides 不为 null 或不为空,则将使用合并的生产者属性创建一个新的 DefaultKafkaProducerFactory,这些属性在提供的工厂属性之后进行覆盖。

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,@Nullable Map<String, Object> configOverrides) {Assert.notNull(producerFactory, "'producerFactory' cannot be null");this.autoFlush = autoFlush;this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;// 是否自定义生产者工厂 this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;if (this.customProducerFactory) {Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties());
// 覆盖工厂属性 configs.putAll(configOverrides); // 创建新的 DefaultKafkaProducerFactoryDefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs, producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier());
// 设置物理关闭生产者的超时时间  newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds());
// 设置是否分区 newFactory.setProducerPerConsumerPartition(producerFactory.isProducerPerConsumerPartition());
// 设置是否 每个线程创建一个 生产者; newFactory.setProducerPerThread(producerFactory.isProducerPerThread());
// 新工厂赋值this.producerFactory = newFactory;} else {this.producerFactory = producerFactory;}
// 是否开启kafka事务 this.transactional = this.producerFactory.transactionCapable();
}

【1.2】发送消息方法(非常重要)

发送消息有很多方法,大致分为两类;

  • send();
  • doSend();

【1.2.1】send() 发送消息

有4个外观方法,使用的都是默认topic;

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {return send(this.defaultTopic, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {return send(this.defaultTopic, key, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {return send(this.defaultTopic, partition, key, data);
}@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {return send(this.defaultTopic, partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord);
}

可以看到,最后还是调用了 底层的 doSend() 方法;


【1.2.2】doSend() 方法

5个 doSend() 方法的外观方法 ,这5个方法对 topic ,分区, 消息key,时间戳,消息value  进行了重载

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);return doSend(producerRecord);
}@Override
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {Assert.notNull(record, "'record' cannot be null");return doSend(record);
}

底层 doSend() 定义如下:

protected ListenableFuture<SendResult<K, V>>

doSend(final ProducerRecord<K, V> producerRecord)

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {// 获取生产者 final Producer<K, V> producer = getTheProducer(producerRecord.topic());this.logger.trace(() -> "Sending: " + producerRecord);final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();Object sample = null;if (this.micrometerEnabled && this.micrometerHolder == null) {this.micrometerHolder = obtainMicrometerHolder();}if (this.micrometerHolder != null) {sample = this.micrometerHolder.start();}// 发送消息 Future<RecordMetadata> sendFuture =producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));// May be an immediate failure (注意,这里可能马上失败,或有运行时异常抛出)if (sendFuture.isDone()) { try {sendFuture.get(); // 这里调用get会阻塞,如果发送没有完成的话 }catch (InterruptedException e) {Thread.currentThread().interrupt();throw new KafkaException("Interrupted", e);}catch (ExecutionException e) {throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace}}if (this.autoFlush) { // 自动刷新 flush();}this.logger.trace(() -> "Sent: " + producerRecord);return future;
}

【代码解说】

step1, 调用了 getTheProducer() 获取生产者 ;

关于 DefaultKafkaProducerFactory.createProducer() 可以参见 以下博文,因篇幅,本文不再赘述;

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客

step2,producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)) ; 调用了 buildCallback(...) 构建回调对象;

非事务模式,则关闭生产者;

由 DefaultKafkaProducerFactory 可知, 生产者是  CloseSafeProducer, 其包裹了 原生 kafka生产者; 所以 调用了 CloseSafeProducer.close() 方法;

step3,自动刷新缓存 flush();

如果 ProducerFactory 提供单例生产者(例如 DefaultKafkaProducerFactory),则调用此方法才有意义。

public void flush() {Producer<K, V> producer = getTheProducer();try {producer.flush();}finally {closeProducer(producer, inTransaction());}
}protected void closeProducer(Producer<K, V> producer, boolean inTx) {if (!inTx) { // 非事务才关闭 producer.close(this.closeTimeout);}
}

【2】KafkaTemplate 发送消息与生产者复用

我们再次 follow了 DefaultKafkaProducerFactory的 doCreateProducer() 方法;

第1次因为发送消息 新建了 producer;

第2次再发送消息时,因为producer 不为null;所以直接取走;

同时 synchronized同步块可以避免并发问题;

发送消息后,是否关闭生产者,可以参考 【小结】


【小结】

通过分析 KafkaTemplate.doSend() 消息发送分发, 我们可以看到,

每发送一条消息,如果抛出异常的话,则会关闭kafka生产者,否则不会关闭生产者;原因参见

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客https://blog.csdn.net/PacosonSWJTU/article/details/121306370中的章节 【4.5.1】;

spring-kafka整合:KafkaTemplate-kafka模板类介绍相关推荐

  1. spring boot整合Quartz 在Job类中注入其他对象报空指针异常java.lang.NullPointerException at com.sxt.quartz.QuartzDemo.e

    情况在Job 类中注入RedisTemplate 报空指针异常 原因:我们使用JobDetailFactoryBean 帮我们创建对象 实际上调用的是AdaptableJobFactory 下的这个方 ...

  2. druid监控页面_Spring boot学习(四)Spring boot整合Druid

    前言 在上一篇博客中我们介绍了Spring boot配置Mybatis,但是并没有配置连接池,这在实际开发过程中肯定是不切实际的,多次的数据库连接会给程序和数据库都带来没必要的负担,这一篇博客我将介绍 ...

  3. spring boot 2.x 系列 —— spring boot 整合 kafka

    文章目录 一.kafka的相关概念: 1.主题和分区 2.分区复制 3. 生产者 4. 消费者 5.broker和集群 二.项目说明 1.1 项目结构说明 1.2 主要依赖 二. 整合 kafka 2 ...

  4. Spring Boot 整合——Spring Boot整合kafka整合

    Spring Boot 整合之前的内容 项目名称 描述 地址 base-data-mybatis 整合mybatis-plus(实际上官方教程已经很多,只做了自定义插件) 未完成 base-jpa J ...

  5. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

  6. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

  7. freemarker ftl模板_Spring Boot2 系列教程(十)Spring Boot 整合 Freemarker

    今天来聊聊 Spring Boot 整合 Freemarker. Freemarker 简介 这是一个相当老牌的开源的免费的模版引擎.通过 Freemarker 模版,我们可以将数据渲染成 HTML ...

  8. java 模板引擎_极简 Spring Boot 整合 Thymeleaf 页面模板

    点击"牧码小子"关注,和众多大牛一起成长! 关注后,后台回复 java ,领取松哥为你精心准备的技术干货! 虽然现在慢慢在流行前后端分离开发,但是据松哥所了解到的,还是有一些公司在 ...

  9. Spring Boot整合Thymeleaf模板引擎

    转载自 Spring Boot整合Thymeleaf模板引擎 什么是Thymeleaf Thymeleaf是一款用于渲染XML.XHTML.HTML5内容的模板引擎.类似Velocity,FreeMa ...

最新文章

  1. 谷歌BERT预训练源码解析(二):模型构建
  2. SAP WM Movement Type 里的‘Ref.Stor.Type Search’字段用法初探
  3. 旷视三维视觉Workshop | 3D组组长与你畅聊三维技术新动态
  4. Linux根文件系统结构再认识
  5. 任意点 曲线距离_中级数学11-曲线函数
  6. Web前端优化,提高加载速度
  7. 常用sql001_partition by 以及 row_number()和 dense_rank()和rank()区别
  8. 五方面入手精选数据库审计产品
  9. 编译原理初学者入门指南
  10. 手机停机后你们知道怎么打电话?教你鲜为人知的手机锦囊
  11. MPC系列:Beaver三元组和BMR协议
  12. 基于JavaEE的智能化网吧服务系统的设计与实现毕业设计论文
  13. MySQL第三方客户端工具
  14. php五只猴子分椰子_(笔试题)分椰子
  15. 用js写随机抽奖代码
  16. Linux ZRAM的简单介绍
  17. c#上位机 源码 控制固高,研华,雷赛这类的运动板卡。偏向 程序运动控制。winform+板卡+sqlite
  18. 常见的请求头的用户代理User-Agent汇总
  19. 尚硅谷数据结构和算法01-数据结构介绍和稀疏数组
  20. 有机光电二极管可应用在哪些领域

热门文章

  1. Educational Codeforces Round 96 E. String Reversa 线段树模拟序列交换
  2. Codeforces Round #694 (Div. 2) D. Strange Definition 质因子分解 + 平方数
  3. 【NOI2012】骑行川藏【拉格朗日乘数法】【二分套二分】
  4. Picture POJ - 1177(矩形周长并))
  5. Acwing 1081. 度的数量(以及本人对数位dp的浅薄理解)
  6. P5287-[HNOI2019]JOJO【KMP】
  7. P3700-[CQOI2017]小Q的表格【分块,欧拉函数】
  8. POJ1275-Cashier Employment【差分约束系统】
  9. 【Floyd】灾后重建(luogu 1119)
  10. 2019.01.24【NOIP普及组】模拟赛C组