spring boot中kafka教程
参考了很多教程,最后精选了几篇,通俗易懂的
kafkaTemplate包装生产者工厂,生产者工厂包含具体的send发送senderProps参数,往topic里发,
ConcurrentKafkaListenerContainerFactory监听器包装消费者工厂,消费者工厂包含具体的consumer消费consumerProps参数,从topic里消费,该topic要和生产者的一致。
public ProducerRecord(String topic, K key, V value) {this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); }
public class KafkaConsumer {
@KafkaListener(topics = "topic")
编写第一个Demo
实现顺序
1,连接kafka服务器的配置
2,kafka-customer:消费者配置
3,kafka-provider:提供者配置
4,KfkaUtils:根据topic发送消息
5,消费者根据topic处理消息
- 创建消费者和生产者的Map配置
- 根据Map配置创建对应的消费者工厂(consumerFactory)和生产者工厂(producerFactory)
- 根据consumerFactory创建监听器的监听器工厂
- 根据producerFactory创建KafkaTemplate(Kafka操作类)
- 创建监听容器
一、精文章
先给你们瞄一眼项目结构,记得把Kafka 启动...
项目结构
创建KafkaConfiguration配置类
都是一些配置参数,具体的作用也在代码中写明了,值得注意的是,KafkaTemplate的类型为<Integer,String>,我们可以找kafkaTemplate的send方法,有多个重载方法,其中有个方法如下,key和data参数都为泛型,这其实就是对应着KafkaTemplate<Integer,String>。那具体有什么用呢,还记得我们的Topic中可以包含多个Partition(分区)吗,那我们如果不想手动指定发送到哪个分区,我们则可以利用key去实现。这里我们的key是Integer类型,template会根据 key 路由到对应的partition中,如果key存在对应的partitionID则发送到该partition中,否则由算法选择发送到哪个partition。
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);return this.doSend(producerRecord);}
@Configuration
@EnableKafka
public class KafkaConfiguration {//ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}//根据consumerProps填写的参数创建消费者工厂@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerProps());}//根据senderProps填写的参数创建生产者工厂@Beanpublic ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(senderProps());}//kafkaTemplate实现了Kafka发送接收等功能@Beanpublic KafkaTemplate<Integer, String> kafkaTemplate() {KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());return template;}//消费者配置参数private Map<String, Object> consumerProps() {Map<String, Object> props = new HashMap<>();//连接地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//GroupIDprops.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交的频率props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");//Session超时设置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");//键的反序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);//值的反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}//生产者配置private Map<String, Object> senderProps (){Map<String, Object> props = new HashMap<>();//连接地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//重试,0为不启用重试机制props.put(ProducerConfig.RETRIES_CONFIG, 1);//控制批处理大小,单位为字节props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//生产者可以使用的总内存字节来缓冲等待发送到服务器的记录props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);//键的序列化方式props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//值的序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}}
创建DemoListener消费者
这里的消费者其实就是一个监听类,指定监听名为topic.quick.demo的Topic,consumerID为demo。
@Component
public class DemoListener {private static final Logger log= LoggerFactory.getLogger(DemoListener.class);//声明consumerID为demo,监听topicName为topic.quick.demo的Topic@KafkaListener(id = "demo", topics = "topic.quick.demo")public void listen(String msgData) {log.info("demo receive : "+msgData);}
}
创建测试类
这里的send方法第一参数为TopicName,第二个参数则是发送的数据
@SpringBootTest
@RunWith(SpringRunner.class)
public class DemoTest {@Autowiredprivate KafkaTemplate kafkaTemplate;@Testpublic void testDemo() throws InterruptedException {kafkaTemplate.send("topic.quick.demo", "this is my first demo");//休眠5秒,为了使监听器有足够的时间监听到topic的数据Thread.sleep(5000);}
}
接下来直接运行这个测试方法,我们可以看到日志中输出了我们发送的消息,这就代表我们成功的消费了测试方法中发送的消息。
2018-09-06 17:26:20.850 INFO 6232 --- [ demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : this is my first demo
启动项目
看清楚了是启动项目,不是测试类,我们来观察一下控制台的输出日志
首先这个是KafkaConsumer的配置信息,每个消费者都会输出该配置信息,配置太多就不做讲解了
2018-09-06 17:40:15.258 INFO 9944 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 100auto.offset.reset = latestbootstrap.servers = [localhost:9092]check.crcs = trueclient.id = connections.max.idle.ms = 540000enable.auto.commit = trueexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = demoheartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 15000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer2018-09-06 17:40:15.274 INFO 9944 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.2
2018-09-06 17:40:15.274 INFO 9944 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 2a121f7b1d402825
这些日志就代表我们成功的创建了Consumer,由于没有做并发配置,所以现在为单个消费者模式,系统会做一个分配Partition的操作,也就是将某个Partition指定给某个消费者消费。 这里有个地方需要注意一下,
看到日志中有输出[Consumer clientId=consumer-1, groupId=demo],我们之前在监听中@KafkaListener注解中配置的id=demo,怎么就变成了groupId=demo,这是因为@KafkaListener注解如果没有指定groupId这个属性的值,则会默认把id作为groupId。
2018-09-06 17:40:15.287 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null)
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=demo] Revoking previously assigned partitions []
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2018-09-06 17:40:15.290 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] (Re-)joining group
2018-09-06 17:40:15.301 INFO 9944 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2018-09-06 17:40:15.302 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=demo] Successfully joined group with generation 33
2018-09-06 17:40:15.303 INFO 9944 --- [ demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=demo] Setting newly assigned partitions [topic.quick.demo-0]
结束
SpringBoot2.0已经提供了Kafka的自动配置,可以在application.properties文件中配置,我觉得更方便
二、精文章
提前启动zk,kafka,并且创建一个Topic
[root@Basic kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
1
确保你的kafka能够访问,如果访问不了,需要打开外网访问。
config/server.properties
advertised.listeners=PLAINTEXT://192.168.239.128:9092
1
Maven 依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二、项目结构
为了更加体现实际开发需求,一般生产者都是在调用某些接口的服务处理完逻辑之后然后往kafka里面扔数据,然后有一个消费者不停的监控这个Topic,然后处理数据,所以这里把生产者作为一个接口,消费者放到kafka这个目录下,注意@Component注解,不然扫描不到@KafkaListener
三、具体实现代码
SpringBoot配置文件
application.yml
spring:
kafka:
bootstrap-servers: 192.168.239.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者
package cn.saytime.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试kafka生产者
*/
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic", msg);
return "success";
}
}
消费者
这里的消费者会监听这个主题,有消息就会执行,不需要进行while(true)
package cn.saytime.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* kafka消费者测试
*/
@Component
public class TestConsumer {
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
项目启动类
package cn.saytime;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApplication{
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}
四、测试
运行项目,执行:http://localhost:8080/kafka/send?msg=hello
控制台输出:
topic = test_topic, offset = 19, value = hello
1
为了体现消费者不止执行一次就结束,再调用一次接口:
http://localhost:8080/kafka/send?msg=kafkatopic = test_topic, offset = 20, value = kafka
1
所以可以看到这里消费者实际上是不停的poll Topic数据的。
---------------------
三、精文章
1. Apache Kafka是一个分布式流平台
1.1 流平台有三个关键功能:
- 发布和订阅流记录,类似于一个消息队列或企业消息系统
- 以一种容错的持久方式存储记录流
- 在流记录生成的时候就处理它们
1.2 Kafka通常用于两大类应用:
- 构建实时流数据管道,在系统或应用程序之间可靠地获取数据
- 构建对数据流进行转换或输出的实时流媒体应用程序
1.3 有几个特别重要的概念:
Kafka is run as a cluster on one or more servers that can span multiple datacenters.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.
Kafka作为集群运行在一个或多个可以跨多个数据中心的服务器上
从这句话表达了三个意思:
- Kafka是以集群方式运行的
- 集群中可以只有一台服务器,也有可能有多台服务器。也就是说,一台服务器也是一个集群,多台服务器也可以组成一个集群
- 这些服务器可以跨多个数据中心
Kafka集群按分类存储流记录,这个分类叫做主题
这句话表达了以下几个信息:
- 流记录是分类存储的,也就说记录是归类的
- 我们称这种分类为主题
- 简单地来讲,记录是按主题划分归类存储的
每个记录由一个键、一个值和一个时间戳组成
1.4 Kafka有四个核心API:
- Producer API :允许应用发布一条流记录到一个或多个主题
- Consumer API :允许应用订阅一个或多个主题,并处理流记录
- Streams API :允许应用作为一个流处理器,从一个或多个主题那里消费输入流,并将输出流输出到一个或多个输出主题,从而有效地讲输入流转换为输出流
- Connector API :允许将主题连接到已经存在的应用或者数据系统,以构建并允许可重用的生产者或消费者。例如,一个关系型数据库的连接器可能捕获到一张表的每一次变更
(画外音:我理解这四个核心API其实就是:发布、订阅、转换处理、从第三方采集数据。)
在Kafka中,客户端和服务器之间的通信是使用简单的、高性能的、与语言无关的TCP协议完成的。
2. Topics and Logs(主题和日志)
一个topic是一个分类,或者说是记录被发布的时候的一个名字(画外音:可以理解为记录要被发到哪儿去)。
在Kafka中,topic总是有多个订阅者,因此,一个topic可能有0个,1个或多个订阅该数据的消费者。
对于每个主题,Kafka集群维护一个分区日志,如下图所示:
每个分区都是一个有序的、不可变的记录序列,而且记录会不断的被追加,一条记录就是一个结构化的提交日志(a structured commit log)。
分区中的每条记录都被分配了一个连续的id号,这个id号被叫做offset(偏移量),这个偏移量唯一的标识出分区中的每条记录。(PS:如果把分区比作数据库表的话,那么偏移量就是主键)
Kafka集群持久化所有已发布的记录,无论它们有没有被消费,记录被保留的时间是可以配置的。例如,如果保留策略被设置为两天,那么在记录发布后的两天内,可以使用它,之后将其丢弃以释放空间。在对数据大小方面,Kafka的性能是高效的,恒定常量级的,因此长时间存储数据不是问题。
事实上,唯一维护在每个消费者上的元数据是消费者在日志中的位置或者叫偏移量。偏移量是由消费者控制的:通常消费者在读取记录的时候会线性的增加它的偏移量,但是,事实上,由于位置(偏移量)是由消费者控制的,所有它可以按任意它喜欢的顺序消费记录。例如:一个消费者可以重置到一个较旧的偏移量来重新处理之前已经处理过的数据,或者跳转到最近的记录并从“现在”开始消费。
这种特性意味着消费者非常廉价————他们可以来来去去的消息而不会对集群或者其它消费者造成太大影响。
日志中的分区有几个用途。首先,它们允许日志的规模超出单个服务器的大小。每个独立分区都必须与宿主的服务器相匹配,但一个主题可能有多个分区,所以它可以处理任意数量的数据。第二,它们作为并行的单位——稍后再进一步。
(
画外音:简单地来说,日志分区的作用有两个:一、日志的规模不再受限于单个服务器;二、分区意味着可以并行。
什么意思呢?主题建立在集群之上,每个主题维护了一个分区日志,顾名思义,日志是分区的;每个分区所在的服务器的资源(比如:CPU、内存、带宽、磁盘等)是有限的,如果不分区(可以理解为等同于只有一个)的话,必然受限于这个分区所在的服务器,那么多个分区的话就不一样了,就突破了这种限制,服务器可以随便加,分区也可以随便加。
)
3. Distribution(分布)
日志的分区分布在集群中的服务器上,每个服务器处理数据,并且分区请求是共享的。每个分区被复制到多个服务器上以实现容错,到底复制到多少个服务器上是可以配置的。
Each partition is replicated across a configurable number of servers for fault tolerance.
每个分区都有一个服务器充当“leader”角色,并且有0个或者多个服务器作为“followers”。leader处理对这个分区的所有读和写请求,而followers被动的从leader那里复制数据。如果leader失败,followers中的其中一个会自动变成新的leader。每个服务器充当一些分区的“leader”的同时也是其它分区的“follower”,因此在整个集群中负载是均衡的。
也就是说,每个服务器既是“leader”也是“follower”。我们知道一个主题可能有多个分区,一个分区可能在一个服务器上也可能跨多个服务器,然而这并不以为着一台服务器上只有一个分区,是可能有多个分区的。每个分区中有一个服务器充当“leader”,其余是“follower”。leader负责处理这个它作为leader所负责的分区的所有读写请求,而该分区中的follow只是被动复制leader的数据。这个有点儿像HDFS中的副本机制。例如:分区-1有服务器A和B组成,A是leader,B是follower,有请求要往分区-1中写数据的时候就由A处理,然后A把刚才写的数据同步给B,这样的话正常请求相当于A和B的数据是一样的,都有分区-1的全部数据,如果A宕机了,B成为leader,接替A继续处理对分区-1的读写请求。
需要注意的是,分区是一个虚拟的概念,是一个逻辑单元。
4. Producers(生产者)
生产者发布数据到它们选择的主题中。生产者负责选择将记录投递到哪个主题的哪个分区中。要做这件事情,可以简单地用循环方式以到达负载均衡,或者根据一些语义分区函数(比如:基于记录中的某些key)
5. Consumers(消费者)
消费者用一个消费者组名来标识它们自己(PS:相当于给自己贴一个标签,标签的名字是组名,以表明自己属于哪个组),并且每一条发布到主题中的记录只会投递给每个订阅的消费者组中的其中一个消费者实例。消费者实例可能是单独的进程或者在单独的机器上。
如果所有的消费者实例都使用相同的消费者组,那么记录将会在这些消费者之间有效的负载均衡。
如果所有的消费者实例都使用不同的消费者组,那么每条记录将会广播给所有的消费者进程。
上图中其实那个Kafka Cluster换成Topic会更准确一些
一个Kafka集群有2个服务器,4个分区(P0-P3),有两个消费者组。组A中有2个消费者实例,组B中有4个消费者实例。
通常我们会发现,主题不会有太多的消费者组,每个消费者组是一个“逻辑订阅者”(以消费者组的名义订阅主题,而非以消费者实例的名义去订阅)。每个组由许多消费者实例组成,以实现可扩展性和容错。这仍然是发布/订阅,只不过订阅者是一个消费者群体,而非单个进程。
在Kafka中,这种消费方式是通过用日志中的分区除以使用者实例来实现的,这样可以保证在任意时刻每个消费者都是排它的消费,即“公平共享”。Kafka协议动态的处理维护组中的成员。如果有心的实例加入到组中,它们将从组中的其它成员那里接管一些分区;如果组中有一个实例死了,那么它的分区将会被分给其它实例。
(画外音:什么意思呢?举个例子,在上面的图中,4个分区,组A有2个消费者,组B有4个消费者,那么对A来讲组中的每个消费者负责4/2=2个分区,对组B来说组中的每个消费者负责4/4=1个分区,而且同一时间消息只能被组中的一个实例消费。如果组中的成员数量有变化,则重新分配。)
Kafka只提供分区下的记录的总的顺序,而不提供主题下不同分区的总的顺序。每个分区结合按key划分数据的能力排序对大多数应用来说是足够的。然而,如果你需要主题下总的记录顺序,你可以只使用一个分区,这样做的做的话就意味着每个消费者组中只能有一个消费者实例。
6. 保证
在一个高级别的Kafka给出下列保证:
- 被一个生产者发送到指定主题分区的消息将会按照它们被发送的顺序追加到分区中。也就是说,如果记录M1和M2是被同一个生产者发送到同一个分区的,而且M1是先发送的,M2是后发送的,那么在分区中M1的偏移量一定比M2小,并且M1出现在日志中的位置更靠前。
- 一个消费者看到记录的顺序和它们在日志中存储的顺序是一样的。
- 对于一个副本因子是N的主题,我们可以容忍最多N-1个服务器失败,而不会丢失已经提交给日志的任何记录。
7. Spring Kafka
Spring提供了一个“模板”作为发送消息的高级抽象。它也通过使用@KafkaListener注释和“监听器容器”提供对消息驱动POJOs的支持。这些库促进了依赖注入和声明式的使用。
7.1 纯Java方式
1 package com.cjs.example.quickstart;2 3 import org.apache.kafka.clients.consumer.ConsumerConfig;4 import org.apache.kafka.clients.consumer.ConsumerRecord;5 import org.apache.kafka.clients.producer.ProducerConfig;6 import org.apache.kafka.common.serialization.IntegerDeserializer;7 import org.apache.kafka.common.serialization.IntegerSerializer;8 import org.apache.kafka.common.serialization.StringDeserializer;9 import org.apache.kafka.common.serialization.StringSerializer; 10 import org.springframework.kafka.core.*; 11 import org.springframework.kafka.listener.KafkaMessageListenerContainer; 12 import org.springframework.kafka.listener.MessageListener; 13 import org.springframework.kafka.listener.config.ContainerProperties; 14 15 import java.util.HashMap; 16 import java.util.Map; 17 18 public class PureJavaDemo { 19 20 /** 21 * 生产者配置 22 */ 23 private static Map<String, Object> senderProps() { 24 Map<String, Object> props = new HashMap<>(); 25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093"); 26 props.put(ProducerConfig.RETRIES_CONFIG, 0); 27 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 30 return props; 31 } 32 33 /** 34 * 消费者配置 35 */ 36 private static Map<String, Object> consumerProps() { 37 Map<String, Object> props = new HashMap<>(); 38 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9093"); 39 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello"); 40 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 41 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 42 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 43 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 44 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 45 return props; 46 } 47 48 /** 49 * 发送模板配置 50 */ 51 private static KafkaTemplate<Integer, String> createTemplate() { 52 Map<String, Object> senderProps = senderProps(); 53 ProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps); 54 KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); 55 return kafkaTemplate; 56 } 57 58 /** 59 * 消息监听器容器配置 60 */ 61 private static KafkaMessageListenerContainer<Integer, String> createContainer() { 62 Map<String, Object> consumerProps = consumerProps(); 63 ConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps); 64 ContainerProperties containerProperties = new ContainerProperties("test"); 65 KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); 66 return container; 67 } 68 69 70 public static void main(String[] args) throws InterruptedException { 71 String topic1 = "test"; // 主题 72 73 KafkaMessageListenerContainer container = createContainer(); 74 ContainerProperties containerProperties = container.getContainerProperties(); 75 containerProperties.setMessageListener(new MessageListener<Integer, String>() { 76 @Override 77 public void onMessage(ConsumerRecord<Integer, String> record) { 78 System.out.println("Received: " + record); 79 } 80 }); 81 container.setBeanName("testAuto"); 82 83 container.start(); 84 85 KafkaTemplate<Integer, String> kafkaTemplate = createTemplate(); 86 kafkaTemplate.setDefaultTopic(topic1); 87 88 kafkaTemplate.sendDefault(0, "foo"); 89 kafkaTemplate.sendDefault(2, "bar"); 90 kafkaTemplate.sendDefault(0, "baz"); 91 kafkaTemplate.sendDefault(2, "qux"); 92 93 kafkaTemplate.flush(); 94 container.stop(); 95 96 System.out.println("结束"); 97 } 98 99 }
运行结果:
Received: ConsumerRecord(topic = test, partition = 0, offset = 67, CreateTime = 1533300970788, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = foo) Received: ConsumerRecord(topic = test, partition = 0, offset = 68, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = bar) Received: ConsumerRecord(topic = test, partition = 0, offset = 69, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = baz) Received: ConsumerRecord(topic = test, partition = 0, offset = 70, CreateTime = 1533300970793, serialized key size = 4, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = qux)
7.2 更简单一点儿,用SpringBoot
1 package com.cjs.example.quickstart;2 3 import org.apache.kafka.clients.consumer.ConsumerRecord;4 import org.springframework.beans.factory.annotation.Autowired;5 import org.springframework.boot.CommandLineRunner;6 import org.springframework.context.annotation.Bean;7 import org.springframework.context.annotation.Configuration;8 import org.springframework.kafka.annotation.KafkaListener;9 import org.springframework.kafka.core.KafkaTemplate; 10 11 @Configuration 12 public class JavaConfigurationDemo { 13 14 @KafkaListener(topics = "test") 15 public void listen(ConsumerRecord<String, String> record) { 16 System.out.println("收到消息: " + record); 17 } 18 19 @Bean 20 public CommandLineRunner commandLineRunner() { 21 return new MyRunner(); 22 } 23 24 class MyRunner implements CommandLineRunner { 25 26 @Autowired 27 private KafkaTemplate<String, String> kafkaTemplate; 28 29 @Override 30 public void run(String... args) throws Exception { 31 kafkaTemplate.send("test", "foo1"); 32 kafkaTemplate.send("test", "foo2"); 33 kafkaTemplate.send("test", "foo3"); 34 kafkaTemplate.send("test", "foo4"); 35 } 36 } 37 }
application.properties配置
spring.kafka.bootstrap-servers=192.168.101.5:9092 spring.kafka.consumer.group-id=world
8. 生产者
1 package com.cjs.example.send;2 3 import org.apache.kafka.clients.producer.ProducerConfig;4 import org.apache.kafka.common.serialization.IntegerSerializer;5 import org.apache.kafka.common.serialization.StringSerializer;6 import org.springframework.context.annotation.Bean;7 import org.springframework.context.annotation.Configuration;8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;9 import org.springframework.kafka.core.KafkaTemplate; 10 import org.springframework.kafka.core.ProducerFactory; 11 12 import java.util.HashMap; 13 import java.util.Map; 14 15 @Configuration 16 public class Config { 17 18 public Map<String, Object> producerConfigs() { 19 Map<String, Object> props = new HashMap<>(); 20 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092"); 21 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 22 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 23 return props; 24 } 25 26 public ProducerFactory<Integer, String> producerFactory() { 27 return new DefaultKafkaProducerFactory<>(producerConfigs()); 28 } 29 30 @Bean 31 public KafkaTemplate<Integer, String> kafkaTemplate() { 32 return new KafkaTemplate<Integer, String>(producerFactory()); 33 } 34 35 }
1 package com.cjs.example.send;2 3 import org.springframework.beans.factory.annotation.Autowired;4 import org.springframework.boot.CommandLineRunner;5 import org.springframework.kafka.core.KafkaTemplate;6 import org.springframework.kafka.support.SendResult;7 import org.springframework.stereotype.Component;8 import org.springframework.util.concurrent.ListenableFuture;9 import org.springframework.util.concurrent.ListenableFutureCallback; 10 11 @Component 12 public class MyCommandLineRunner implements CommandLineRunner { 13 14 @Autowired 15 private KafkaTemplate<Integer, String> kafkaTemplate; 16 17 public void sendTo(Integer key, String value) { 18 ListenableFuture<SendResult<Integer, String>> listenableFuture = kafkaTemplate.send("test", key, value); 19 listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { 20 @Override 21 public void onFailure(Throwable throwable) { 22 System.out.println("发送失败啦"); 23 throwable.printStackTrace(); 24 } 25 26 @Override 27 public void onSuccess(SendResult<Integer, String> sendResult) { 28 System.out.println("发送成功," + sendResult); 29 } 30 }); 31 } 32 33 @Override 34 public void run(String... args) throws Exception { 35 sendTo(1, "aaa"); 36 sendTo(2, "bbb"); 37 sendTo(3, "ccc"); 38 } 39 40 41 }
运行结果:
发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=aaa, timestamp=null), recordMetadata=test-0@37] 发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=2, value=bbb, timestamp=null), recordMetadata=test-0@38] 发送成功,SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=3, value=ccc, timestamp=null), recordMetadata=test-0@39]
9. 消费者@KafkaListener
1 package com.cjs.example.receive;2 3 import org.apache.kafka.clients.consumer.ConsumerConfig;4 import org.apache.kafka.clients.consumer.ConsumerRecord;5 import org.apache.kafka.common.serialization.IntegerDeserializer;6 import org.apache.kafka.common.serialization.StringDeserializer;7 import org.springframework.context.annotation.Bean;8 import org.springframework.context.annotation.Configuration;9 import org.springframework.kafka.annotation.KafkaListener; 10 import org.springframework.kafka.annotation.TopicPartition; 11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 12 import org.springframework.kafka.config.KafkaListenerContainerFactory; 13 import org.springframework.kafka.core.ConsumerFactory; 14 import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 15 import org.springframework.kafka.listener.AbstractMessageListenerContainer; 16 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; 17 import org.springframework.kafka.listener.config.ContainerProperties; 18 import org.springframework.kafka.support.Acknowledgment; 19 import org.springframework.kafka.support.KafkaHeaders; 20 import org.springframework.messaging.handler.annotation.Header; 21 import org.springframework.messaging.handler.annotation.Payload; 22 23 import java.util.HashMap; 24 import java.util.List; 25 import java.util.Map; 26 27 @Configuration 28 public class Config2 { 29 30 @Bean 31 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { 32 ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 33 factory.setConsumerFactory(consumerFactory()); 34 factory.setConcurrency(3); 35 ContainerProperties containerProperties = factory.getContainerProperties(); 36 containerProperties.setPollTimeout(2000); 37 // containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); 38 return factory; 39 } 40 41 private ConsumerFactory<Integer,String> consumerFactory() { 42 return new DefaultKafkaConsumerFactory<>(consumerProps()); 43 } 44 45 private Map<String, Object> consumerProps() { 46 Map<String, Object> props = new HashMap<>(); 47 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.5:9092"); 48 props.put(ConsumerConfig.GROUP_ID_CONFIG, "hahaha"); 49 // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 50 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 51 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 52 return props; 53 } 54 55 56 @KafkaListener(topics = "test") 57 public void listen(String data) { 58 System.out.println("listen 收到: " + data); 59 } 60 61 62 @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") 63 public void listen2(String data, Acknowledgment ack) { 64 System.out.println("listen2 收到: " + data); 65 ack.acknowledge(); 66 } 67 68 @KafkaListener(topicPartitions = {@TopicPartition(topic = "test", partitions = "0")}) 69 public void listen3(ConsumerRecord<?, ?> record) { 70 System.out.println("listen3 收到: " + record.value()); 71 } 72 73 74 @KafkaListener(id = "xyz", topics = "test") 75 public void listen4(@Payload String foo, 76 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, 77 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 78 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, 79 @Header(KafkaHeaders.OFFSET) List<Long> offsets) { 80 System.out.println("listen4 收到: "); 81 System.out.println(foo); 82 System.out.println(key); 83 System.out.println(partition); 84 System.out.println(topic); 85 System.out.println(offsets); 86 } 87 88 }
9.1 Committing Offsets
如果enable.auto.commit设置为true,那么kafka将自动提交offset。如果设置为false,则支持下列AckMode(确认模式)。
消费者poll()方法将返回一个或多个ConsumerRecords
- RECORD :处理完记录以后,当监听器返回时,提交offset
- BATCH :当对poll()返回的所有记录进行处理完以后,提交偏offset
- TIME :当对poll()返回的所有记录进行处理完以后,只要距离上一次提交已经过了ackTime时间后就提交
- COUNT :当poll()返回的所有记录都被处理时,只要从上次提交以来收到了ackCount条记录,就可以提交
- COUNT_TIME :和TIME以及COUNT类似,只要这两个中有一个为true,则提交
- MANUAL :消息监听器负责调用Acknowledgment.acknowledge()方法,此后和BATCH是一样的
- MANUAL_IMMEDIATE :当监听器调用Acknowledgment.acknowledge()方法后立即提交
10. Spring Boot Kafka
10.1 application.properties
spring.kafka.bootstrap-servers=192.168.101.5:9092
10.2 发送消息
1 package com.cjs.example;2 3 import org.springframework.beans.factory.annotation.Autowired;4 import org.springframework.kafka.core.KafkaTemplate;5 import org.springframework.web.bind.annotation.RequestMapping;6 import org.springframework.web.bind.annotation.RestController;7 8 import javax.annotation.Resource;9 10 @RestController 11 @RequestMapping("/msg") 12 public class MessageController { 13 14 @Resource 15 private KafkaTemplate<String, String> kafkaTemplate; 16 17 @RequestMapping("/send") 18 public String send(String topic, String key, String value) { 19 kafkaTemplate.send(topic, key, value); 20 return "ok"; 21 } 22 23 }
10.3 接收消息
1 package com.cjs.example;2 3 import org.apache.kafka.clients.consumer.ConsumerRecord;4 import org.springframework.kafka.annotation.KafkaListener;5 import org.springframework.kafka.annotation.KafkaListeners;6 import org.springframework.stereotype.Component;7 8 @Component9 public class MessageListener { 10 11 /** 12 * 监听订单消息 13 */ 14 @KafkaListener(topics = "ORDER", groupId = "OrderGroup") 15 public void listenToOrder(String data) { 16 System.out.println("收到订单消息:" + data); 17 } 18 19 /** 20 * 监听会员消息 21 */ 22 @KafkaListener(topics = "MEMBER", groupId = "MemberGroup") 23 public void listenToMember(ConsumerRecord<String, String> record) { 24 System.out.println("收到会员消息:" + record); 25 } 26 27 /** 28 * 监听所有消息 29 * 30 * 任意时刻,一条消息只会发给组中的一个消费者 31 * 32 * 消费者组中的成员数量不能超过分区数,这里分区数是1,因此订阅该主题的消费者组成员不能超过1 33 */ 34 // @KafkaListeners({@KafkaListener(topics = "ORDER", groupId = "OrderGroup"), 35 // @KafkaListener(topics = "MEMBER", groupId = "MemberGroup")}) 36 // public void listenToAll(String data) { 37 // System.out.println("啊啊啊"); 38 // } 39 40 }
11. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cjs.example</groupId><artifactId>cjs-kafka-example</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>cjs-kafka-example</name><description></description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
参考:https://www.jianshu.com/p/c9581f695d64
:https://blog.csdn.net/saytime/article/details/79950635
https://www.cnblogs.com/cjsblog/p/9416380.html
spring boot中kafka教程相关推荐
- springboot转发http请求_Spring Boot2 系列教程(八)Spring Boot 中配置 Https
https 现在已经越来越普及了,特别是做一些小程序或者公众号开发的时候,https 基本上都是刚需了. 不过一个 https 证书还是挺费钱的,个人开发者可以在各个云服务提供商那里申请一个免费的证书 ...
- springboot怎么返回404_Spring Boot2 系列教程(十三)Spring Boot 中的全局异常处理
在 Spring Boot 项目中 ,异常统一处理,可以使用 Spring 中 @ControllerAdvice 来统一处理,也可以自己来定义异常处理方案.Spring Boot 中,对异常的处理有 ...
- Spring Boot中使用PostgreSQL数据库
在如今的关系型数据库中,有两个开源产品是你必须知道的.其中一个是MySQL,相信关注我的小伙伴们一定都不陌生,因为之前的Spring Boot关于关系型数据库的所有例子都是对MySQL来介绍的.而今天 ...
- Spring Boot中使用时序数据库InfluxDB
除了最常用的关系数据库和缓存之外,之前我们已经介绍了在Spring Boot中如何配置和使用MongoDB.LDAP这些存储的案例.接下来,我们继续介绍另一种特殊的数据库:时序数据库InfluxDB在 ...
- Spring Boot中如何扩展XML请求和响应的支持
在之前的所有Spring Boot教程中,我们都只提到和用到了针对HTML和JSON格式的请求与响应处理.那么对于XML格式的请求要如何快速的在Controller中包装成对象,以及如何以XML的格式 ...
- Spring Boot中使用RabbitMQ
很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Me ...
- Spring Boot中的一些常用配置介绍!
这篇教程将为你介绍Spring Boot 中的一些常用配置,比如:自定义 Banner.配置日志.关闭特定的自动配置等. 自定义Banner 在 Spring Boot 启动的时候会有一个默认的启动图 ...
- 在Spring Boot中使用内存数据库
文章目录 H2数据库 HSQLDB Apache Derby SQLite 在Spring Boot中使用内存数据库 所谓内存数据库就是可以在内存中运行的数据库,不需要将数据存储在文件系统中,但是相对 ...
- Spring Boot中Spring data注解的使用
文章目录 Spring Data Annotations @Transactional @NoRepositoryBean @Param @Id @Transient @CreatedBy, @Las ...
最新文章
- matlab svr 预测,SVR 多目标预测
- linux shell 里面执行python 程序_Linux下编写脚本Shell和Python的区别?
- java venus_来认识一下venus-init——一个让你仅需一个命令开始Java开发的命令行工具...
- 阿里巴巴 Arthas 3.1.5版本支持火焰图,快速定位应用热点
- Android 系统(123)---MTK android 常用修改点
- OC开发实例变量的访问控制详解
- 虚方法和重写方法的继承特性
- 图像处理中的直方图与均衡化
- 在谷歌chrome、Firefox等浏览器打开、编辑、保存微软Office、金山WPS文档
- 数学建模国赛-2015A太阳影子定位再研究(未完)
- 彻底清除 mplay.com与mplay.exe病毒
- 恢复被文件夹病毒恶意隐藏的文件夹
- Docker与微服务实战(入门)
- 淘宝直通车辅助工具系统使用教程
- 性能优化之MySQL优化(转)
- 【C语言】万字讲解 从零到精通 (文件操作与文件函数)
- LVS负载均衡群集——NAT模式实例
- 谷歌滤镜软件叫什么_谷歌app爆红的拍照功能:你最像名画中的谁?
- HTML 之 br 标签
- enc易能变频_enc易能变频器EDS1000-4T0015G/0022P说明书