springboot整合kafka3.1,实现基本配置和操作

这篇是单机的zookeeper形式,适用于入门。等有时间会更新kraft搭建。

博主自己装了双系统,虚拟机刚刚删了,所以没去做集群,不过当大家的kafka版本来到3.1,建议大家去学习KRaft集群搭建而不是去整合zookeeper。下面介绍一下怎么整合,正所谓举一反三,这个会了,到时候换个环境就ok,讲的不好的地方还请见谅。

文章目录

  • springboot整合kafka3.1,实现基本配置和操作
    • 前提准备
      • 1、springboot
      • 2、kafka
        • 1、kafka下载
        • 2、项目引入依赖
          • 新版本的kafka命令和老版本的不一样。
      • 3、zookeeper
        • 1、自带的
        • 2、自己下载zookeeper
    • springboot整合操作kafka
      • 1、config
      • 2、回调(conponent)
        • 1、较为泛用的
        • 2、一个方法对应一个特定的回调
          • 1、第一种写法
          • 2、第二种写法
      • 3、consumer(消费者)
      • 4、controller生产者(produces)
        • 1、同步发送
        • 2、异步发送
        • 3. 使用ack机制实现可靠
      • 5、exception
      • 6、filter
      • 7、序列化编码解码
        • 编码
        • 解码
      • 8、分区策略
        • 常规
        • 自定义

前提准备

1、springboot

博主自己用的是2.7版本的

一般关于springboot我会引入一下依赖

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>

2、kafka

1、kafka下载

https://kafka.apache.org/downloads 这里下载,这里再提一句,建议直接放到d盘e盘一级子目录里面,否则有可能出现启动的时候名字太长的问题

下载之后修改配置文件server.properties

log.dirs=E:\kafka_2.13-3.2.3\kafka-logs

2、项目引入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>根据自己的版本选择</version>  </dependency>

这里提一句:不一样的springboot对应的kafka依赖版本也不一样,所以先去官网查看依赖版本。

https://spring.io/projects/spring-kafka 点这个链接去找对应的,别自己乱下最新的。

新版本的kafka命令和老版本的不一样。

试着启动,命令看你在哪个文件夹,如果是kafka,用下面的,如果是linux,把bat换成sh

bin\windows\kafka-server-start.bat config\server.properties

简单使用

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test(自己取)

查看

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

3、zookeeper

新版本的kafka其实自己集成了zookeeper,博主这里同样介绍两种方式,一种自己重新去下个zookeeper,一种用kafka自带的。

1、自带的

修改配置zookeeper.properties

dataDir=/opt/kafka/zookeeper/data/dataDir
dataLogDir=/opt/kafka/zookeeper/data/dataLogDir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
tickTime=2000
initLimit=10
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2、自己下载zookeeper

由于是自己下载所以也不用去配置kafka文件中的zookeeper.properties

https://zookeeper.apache.org/releases.html#download 这是下载地址

进去之后很简单,一样改配置,进入conf目录下,将zoo_example.cfg重命名为zoo.cfg

同样修改dataDir和log, 我的是:

example sakes.dataDir=E:\\java-configuration\\apache-zookeeper-3.7.1-bin\\data存放事务日志目录dataLogDir=E:\\java-configuration\\apache-zookeeper-3.7.1-bin\\logs

然后运行zkserver.cmd和zkcli.cmd, 一个是启动我们的zookeeper服务器,一个是客户端。

springboot整合操作kafka

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KCJ0jkN3-1664006842070)(C:\Users\CSEN\AppData\Roaming\Typora\typora-user-images\image-20220924153503966.png)]

这里用一张图作为流程讲解如何整合。

1、config

其实可以把下面的conponent exception filter什么的都丢进来,可以配一些生产者消费者的信息,想配什么都可以,这个就不展示了。

2、回调(conponent)

回调有多种写法,这里介绍三种

1、较为泛用的

@Component
public class KafkaSendResultHandler implements ProducerListener {@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("Message send error : " + producerRecord.toString());}
}

然后在生产者里面使用

@Autowired
private KafkaSendResultHandler kafkaSendResultHandler;。。。。。省略kafkaTemplate.setProducerListener(kafkaSendResultHandler);

2、一个方法对应一个特定的回调

1、第一种写法
kafkaTemplate.send("topic1", callbackMessage).addCallback(success ->{String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);}, failure -> {System.out.println("发送消息失败:" + failure.getMessage());
});
2、第二种写法
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败:"+ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}
});

3、consumer(消费者)

使用KafkaListener,指定消息类型(必选)和groupid,partitions offset(可选)


@KafkaListener(topics = {"topic1"}, groupId = "felix-group0" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<?,?> record){System.out.println("简单消费:" + record.topic() + "--" + record.partition() + "--" + record.value());}@KafkaListener(id = "comsumer1", groupId = "felix-group1", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"}),@TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "8"))
} ,errorHandler = "consumerAwareErrorHandler")
public void onMessage2(ConsumerRecord<?, ?> record){System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
@KafkaListener(id = "consumer2",groupId = "felix-group2", topics = "topic1" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {System.out.println(">>>批量消费一次,records.size()="+records.size());for (ConsumerRecord<?, ?> record : records) {System.out.println(record.value());}
}

4、controller生产者(produces)

1、同步发送

如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send("test", message).get();

2、异步发送

可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息

kafkaTemplate.send("test", message);

3. 使用ack机制实现可靠

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks",这个参数决定了producer要求leader partition收到确认的副本个数:

  • 如果acks设置为0,表示producer不会等待broker的相应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
  • 若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。
  • 若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

5、exception

bean注入

@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return (message, exception, consumer) -> {System.out.println("消费异常:"+message.getPayload());return null;};
}

消费异常配置注解实现

@KafkaListener(topics = {"topic1"}, groupId = "felix-group0" ,errorHandler = "consumerAwareErrorHandler")
public void onMessage1(ConsumerRecord<?,?> record){System.out.println("简单消费:" + record.topic() + "--" + record.partition() + "--" + record.value());}

6、filter

bean注入factory的时候配置好消息过滤策略

//配置消息过滤策略@Bean(value = "filterContainerFactory")public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 被过滤的消息将被丢弃factory.setAckDiscarded(true);// 消息过滤策略factory.setRecordFilterStrategy(consumerRecord -> {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}else {return true;}});return factory;}

消费异常配置注解实现

// 消息过滤监听@KafkaListener(topics = {"topic1"}, containerFactory = "filterContainerFactory")public void onMessage6(ConsumerRecord<?, ?> record) {System.out.println(record.value());}

e {
return true;
}
});

    return factory;}

消费异常配置注解实现

// 消息过滤监听
@KafkaListener(topics = {“topic1”}, containerFactory = “filterContainerFactory”)
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}

7、序列化编码解码

序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, Boolean isKey) {}byte[] serialize(String var1, T var2);default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);}default void close() {}
}

所以我们只用去实现serialize即可

编码

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();
}
}

解码

public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

最后在application中配置key-serializer 和 value-serializer

8、分区策略

分区策略一般分为四种情况:

  • 有分区号,直接将数据发送到指定的分区里面去
  • 没有分区号,但是给了数据的key值,根据key取hashCode进行分区
  • 分区号和key值都没有,直接使用默认的轮循分区
  • 自定义分区

常规

测试kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");

自定义

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//        定义自己的分区策略
//                如果key以0开头,发到0号分区
//                其他都扔到1号分区String keyStr = key+"";if (keyStr.startsWith("0")){return 0;}else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
@Configuration
public class MyPartitionTemplate {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;KafkaTemplate kafkaTemplate;@PostConstructpublic void setKafkaTemplate() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//注意分区器在这里!!!props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));}public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}
//测试自定义分区发送
@RestController
public class MyPartitionProducer {@AutowiredMyPartitionTemplate template;//    使用0开头和其他任意字母开头的key发送消息
//    看控制台的输出,在哪个分区里?@GetMapping("/kafka/myPartitionSend/{key}")public void setPartition(@PathVariable("key") String key) {template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");}
}

springboot整合kafka3.1,实现基本配置和操作相关推荐

  1. SpringBoot整合Shiro静态资源拦截配置

    springboot整合thymeleaf会默认访问resources文件下的static目录 方法一:将 map.put("/static/**","anon" ...

  2. springboot整合layui实现数据表格的分页操作

    第一步,引入依赖文件 <link rel="stylesheet" href="./layui/css/layui.css"> <script ...

  3. 【微服务】Spring-Boot整合Consul (自定义服务配置及健康检查)

    为什么80%的码农都做不了架构师?>>>    目的 上文提到仅使用discovery包自带的注册功能进行服务注册,但是由于监控的是 /health,使用actuator实现自由度不 ...

  4. Springboot整合quartz实现定时任务(基本配置)

    //定义相关配置 @Configuration @EnableScheduling public class QuartzConfig {@Autowiredprivate SpringJobFact ...

  5. 【过程记录】springboot整合redis/分别用redisRepository和redistemplate操作redis

    导入依赖 基本配置 使用RedisTemplate访问redis 使用Redisrepository访问redis 实例: 导入依赖 菜单大部分情况下不会出现变化,我们可以将其放入Redis 加快加载 ...

  6. SpringBoot整合百度人脸识别SDK离线版操作步骤,Windows发布打包SpringBoot百度人脸识别SDK项目,以及解决百度人脸识别SDK离线版遇到的问题

    前言 1.下载百度人脸识别SDK离线版. 2.开发工具:IntelliJ IDEA 百度人脸识别官网:https://cloud.baidu.com/doc/FACE/s/Ol0rre5u5 步骤 一 ...

  7. 用SpringBoot整合ES数据库基础

    一.SpringBoot整合ES数据库 1.配置原生的依赖. <properties><java.version>1.8</java.version><!-- ...

  8. redis命令,SpringBoot整合Redis6,主从复制,哨兵模式,集群,springCache初高级应用。

    目录 1. Docker安装Redis 2. Redis的基础 2.1 redis的key命令 2.2 reids的数据结构(6.0新增的数据结构) 1. String(字符串)类型 2. List( ...

  9. SpringBoot整合第三方技术学习笔记(自用)

    SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...

最新文章

  1. python函数拟合不规则曲线_python中的多变量(多项式)最佳拟合曲线?
  2. 利用隐藏神经元解决异或问题的小型示例程序
  3. python语言程序设计教程-Python语言程序设计(视频教程)
  4. 善于 调用Windows API
  5. 在记录实体log信息的时候,2个公司的区别
  6. vscode html如何插入模板?(!)
  7. git bash命令_更优雅地使用命令行
  8. 豆瓣网络爬虫-java网络爬虫[验证码模拟登陆]详细介绍
  9. sync/atomic 库使用小结
  10. leetcode python3 简单题83. Remove Duplicates from Sorted List
  11. 爬虫:查找自己浏览器headers
  12. python数据分析收获与心得体会_初次数据分析--我的心得体会
  13. Tabular Editor学习笔记_3:警告标志及解决办法
  14. 软考高级系统架构设计师总结
  15. 服务器改无线路由器怎么设置,怎么把旧路由器改装成中继器
  16. 电力英语计算机C级多少分,中级工程师英语加权分多少
  17. TiDB EcoSystem Tools 原理解读(一):TiDB-Binlog 架构演进与实现原理
  18. [从头读历史] 第263节 左传 [BC537至BC479]
  19. php 跨站脚本攻击漏洞,PHP跨站脚本攻击(XSS)漏洞修复思路(二)
  20. 阿里云、百度云、腾讯云、AWS、微软Azure、华为云、金山云等云服务商产品的差异是啥?

热门文章

  1. 一个程序员只要会撸代码就可以了?
  2. 两万字总结python之pandas库
  3. 网络安全攻防演练的组织架构是什么?有哪些防守手段?
  4. 人工智能的研究热点和应用,主要包含哪几个方面?
  5. 二维非稳态导热微分方程_一种二维非稳态导热问题的数值解法.pdf
  6. 政简网:公务员报名到上岸需要多长时间
  7. kotlin+android+基础,kotlin-android动画基础篇
  8. Excel怎么转换PDF文件?
  9. 【JavaScript】类数组详解
  10. 怎样判定电磁铁的方向