springboot整合kafka3.1,实现基本配置和操作
springboot整合kafka3.1,实现基本配置和操作
这篇是单机的zookeeper形式,适用于入门。等有时间会更新kraft搭建。
博主自己装了双系统,虚拟机刚刚删了,所以没去做集群,不过当大家的kafka版本来到3.1,建议大家去学习KRaft集群搭建而不是去整合zookeeper。下面介绍一下怎么整合,正所谓举一反三,这个会了,到时候换个环境就ok,讲的不好的地方还请见谅。
文章目录
- springboot整合kafka3.1,实现基本配置和操作
- 前提准备
- 1、springboot
- 2、kafka
- 1、kafka下载
- 2、项目引入依赖
- 新版本的kafka命令和老版本的不一样。
- 3、zookeeper
- 1、自带的
- 2、自己下载zookeeper
- springboot整合操作kafka
- 1、config
- 2、回调(conponent)
- 1、较为泛用的
- 2、一个方法对应一个特定的回调
- 1、第一种写法
- 2、第二种写法
- 3、consumer(消费者)
- 4、controller生产者(produces)
- 1、同步发送
- 2、异步发送
- 3. 使用ack机制实现可靠
- 5、exception
- 6、filter
- 7、序列化编码解码
- 编码
- 解码
- 8、分区策略
- 常规
- 自定义
前提准备
1、springboot
博主自己用的是2.7版本的
一般关于springboot我会引入一下依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
2、kafka
1、kafka下载
https://kafka.apache.org/downloads 这里下载,这里再提一句,建议直接放到d盘e盘一级子目录里面,否则有可能出现启动的时候名字太长的问题
下载之后修改配置文件server.properties
log.dirs=E:\kafka_2.13-3.2.3\kafka-logs
2、项目引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>根据自己的版本选择</version> </dependency>
这里提一句:不一样的springboot对应的kafka依赖版本也不一样,所以先去官网查看依赖版本。
https://spring.io/projects/spring-kafka 点这个链接去找对应的,别自己乱下最新的。
新版本的kafka命令和老版本的不一样。
试着启动,命令看你在哪个文件夹,如果是kafka,用下面的,如果是linux,把bat换成sh
bin\windows\kafka-server-start.bat config\server.properties
简单使用
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test(自己取)
查看
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
3、zookeeper
新版本的kafka其实自己集成了zookeeper,博主这里同样介绍两种方式,一种自己重新去下个zookeeper,一种用kafka自带的。
1、自带的
修改配置zookeeper.properties
dataDir=/opt/kafka/zookeeper/data/dataDir
dataLogDir=/opt/kafka/zookeeper/data/dataLogDir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
tickTime=2000
initLimit=10
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
2、自己下载zookeeper
由于是自己下载所以也不用去配置kafka文件中的zookeeper.properties
https://zookeeper.apache.org/releases.html#download 这是下载地址
进去之后很简单,一样改配置,进入conf目录下,将zoo_example.cfg
重命名为zoo.cfg
同样修改dataDir和log, 我的是:
example sakes.dataDir=E:\\java-configuration\\apache-zookeeper-3.7.1-bin\\data存放事务日志目录dataLogDir=E:\\java-configuration\\apache-zookeeper-3.7.1-bin\\logs
然后运行zkserver.cmd和zkcli.cmd, 一个是启动我们的zookeeper服务器,一个是客户端。
springboot整合操作kafka
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KCJ0jkN3-1664006842070)(C:\Users\CSEN\AppData\Roaming\Typora\typora-user-images\image-20220924153503966.png)]
这里用一张图作为流程讲解如何整合。
1、config
其实可以把下面的conponent exception filter什么的都丢进来,可以配一些生产者消费者的信息,想配什么都可以,这个就不展示了。
2、回调(conponent)
回调有多种写法,这里介绍三种
1、较为泛用的
@Component
public class KafkaSendResultHandler implements ProducerListener {@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("Message send error : " + producerRecord.toString());}
}
然后在生产者里面使用
@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;。。。。。省略kafkaTemplate.setProducerListener(kafkaSendResultHandler);
2、一个方法对应一个特定的回调
1、第一种写法
kafkaTemplate.send("topic1", callbackMessage).addCallback(success ->{String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);}, failure -> {System.out.println("发送消息失败:" + failure.getMessage());
});
2、第二种写法
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败:"+ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}
});
3、consumer(消费者)
使用KafkaListener,指定消息类型(必选)和groupid,partitions offset(可选)
@KafkaListener(topics = {"topic1"}, groupId = "felix-group0" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<?,?> record){System.out.println("简单消费:" + record.topic() + "--" + record.partition() + "--" + record.value());}@KafkaListener(id = "comsumer1", groupId = "felix-group1", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"}),@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "8"))
} ,errorHandler = "consumerAwareErrorHandler")
public void onMessage2(ConsumerRecord<?, ?> record){System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
@KafkaListener(id = "consumer2",groupId = "felix-group2", topics = "topic1" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {System.out.println(">>>批量消费一次,records.size()="+records.size());for (ConsumerRecord<?, ?> record : records) {System.out.println(record.value());}
}
4、controller生产者(produces)
1、同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send("test", message).get();
2、异步发送
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息
kafkaTemplate.send("test", message);
3. 使用ack机制实现可靠
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:
- 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
- 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
- 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
5、exception
bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return (message, exception, consumer) -> {System.out.println("消费异常:"+message.getPayload());return null;};
}
消费异常配置注解实现
@KafkaListener(topics = {"topic1"}, groupId = "felix-group0" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<?,?> record){System.out.println("简单消费:" + record.topic() + "--" + record.partition() + "--" + record.value());}
6、filter
bean注入factory的时候配置好消息过滤策略
//配置消息过滤策略@Bean(value = "filterContainerFactory")public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);// 消息过滤策略factory.setRecordFilterStrategy(consumerRecord -> {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}else {return true;}});return factory;}
消费异常配置注解实现
// 消息过滤监听@KafkaListener(topics = {"topic1"}, containerFactory = "filterContainerFactory")public void onMessage6(ConsumerRecord<?, ?> record) {System.out.println(record.value());}
e {
return true;
}
});
return factory;}
消费异常配置注解实现
// 消息过滤监听
@KafkaListener(topics = {“topic1”}, containerFactory = “filterContainerFactory”)
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
7、序列化编码解码
序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer
)
public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, Boolean isKey) {}byte[] serialize(String var1, T var2);default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);}default void close() {}
}
所以我们只用去实现serialize即可
编码
public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();
}
}
解码
public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}
最后在application中配置key-serializer 和 value-serializer
8、分区策略
分区策略一般分为四种情况:
- 有分区号,直接将数据发送到指定的分区里面去
- 没有分区号,但是给了数据的key值,根据key取hashCode进行分区
- 分区号和key值都没有,直接使用默认的轮循分区
- 自定义分区
常规
测试kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");
自定义
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 定义自己的分区策略
// 如果key以0开头,发到0号分区
// 其他都扔到1号分区String keyStr = key+"";if (keyStr.startsWith("0")){return 0;}else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
@Configuration
public class MyPartitionTemplate {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;KafkaTemplate kafkaTemplate;@PostConstructpublic void setKafkaTemplate() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//注意分区器在这里!!!props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));}public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}
//测试自定义分区发送
@RestController
public class MyPartitionProducer {@AutowiredMyPartitionTemplate template;// 使用0开头和其他任意字母开头的key发送消息
// 看控制台的输出,在哪个分区里?@GetMapping("/kafka/myPartitionSend/{key}")public void setPartition(@PathVariable("key") String key) {template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");}
}
springboot整合kafka3.1,实现基本配置和操作相关推荐
- SpringBoot整合Shiro静态资源拦截配置
springboot整合thymeleaf会默认访问resources文件下的static目录 方法一:将 map.put("/static/**","anon" ...
- springboot整合layui实现数据表格的分页操作
第一步,引入依赖文件 <link rel="stylesheet" href="./layui/css/layui.css"> <script ...
- 【微服务】Spring-Boot整合Consul (自定义服务配置及健康检查)
为什么80%的码农都做不了架构师?>>> 目的 上文提到仅使用discovery包自带的注册功能进行服务注册,但是由于监控的是 /health,使用actuator实现自由度不 ...
- Springboot整合quartz实现定时任务(基本配置)
//定义相关配置 @Configuration @EnableScheduling public class QuartzConfig {@Autowiredprivate SpringJobFact ...
- 【过程记录】springboot整合redis/分别用redisRepository和redistemplate操作redis
导入依赖 基本配置 使用RedisTemplate访问redis 使用Redisrepository访问redis 实例: 导入依赖 菜单大部分情况下不会出现变化,我们可以将其放入Redis 加快加载 ...
- SpringBoot整合百度人脸识别SDK离线版操作步骤,Windows发布打包SpringBoot百度人脸识别SDK项目,以及解决百度人脸识别SDK离线版遇到的问题
前言 1.下载百度人脸识别SDK离线版. 2.开发工具:IntelliJ IDEA 百度人脸识别官网:https://cloud.baidu.com/doc/FACE/s/Ol0rre5u5 步骤 一 ...
- 用SpringBoot整合ES数据库基础
一.SpringBoot整合ES数据库 1.配置原生的依赖. <properties><java.version>1.8</java.version><!-- ...
- redis命令,SpringBoot整合Redis6,主从复制,哨兵模式,集群,springCache初高级应用。
目录 1. Docker安装Redis 2. Redis的基础 2.1 redis的key命令 2.2 reids的数据结构(6.0新增的数据结构) 1. String(字符串)类型 2. List( ...
- SpringBoot整合第三方技术学习笔记(自用)
SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...
最新文章
- python函数拟合不规则曲线_python中的多变量(多项式)最佳拟合曲线?
- 利用隐藏神经元解决异或问题的小型示例程序
- python语言程序设计教程-Python语言程序设计(视频教程)
- 善于 调用Windows API
- 在记录实体log信息的时候,2个公司的区别
- vscode html如何插入模板?(!)
- git bash命令_更优雅地使用命令行
- 豆瓣网络爬虫-java网络爬虫[验证码模拟登陆]详细介绍
- sync/atomic 库使用小结
- leetcode python3 简单题83. Remove Duplicates from Sorted List
- 爬虫:查找自己浏览器headers
- python数据分析收获与心得体会_初次数据分析--我的心得体会
- Tabular Editor学习笔记_3:警告标志及解决办法
- 软考高级系统架构设计师总结
- 服务器改无线路由器怎么设置,怎么把旧路由器改装成中继器
- 电力英语计算机C级多少分,中级工程师英语加权分多少
- TiDB EcoSystem Tools 原理解读(一):TiDB-Binlog 架构演进与实现原理
- [从头读历史] 第263节 左传 [BC537至BC479]
- php 跨站脚本攻击漏洞,PHP跨站脚本攻击(XSS)漏洞修复思路(二)
- 阿里云、百度云、腾讯云、AWS、微软Azure、华为云、金山云等云服务商产品的差异是啥?