这片文章中我们来集成下常用的消息队列(MQ)kafka,至于消息队列的作用,在此不再赘述,参考前面的文章。

在该篇文章中我没有采用配置文件的形式(application-dev.properties)配置,而是手动编写的kafkaProduce和kafkaConsumer的配置,这样更灵活。

注:基于demo-springboot

1.打开pom.xml加入以下内容:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<!--低版本貌似存在问题:1.1.7-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version>
</dependency>

以上为引入spring-kafka的最新依赖包

2.编写生产者的配置:kafkaProduceConfig

package com.example.demo.config;import com.google.common.collect.Maps;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.Map;/*** @author xiaofeng* @version V1.0* @title: KafkaProducerConfig.java* @package: com.example.demo.config* @description: kafka生产者配置* @date 2018/4/2 0002 下午 3:49*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {public Map<String, Object> producerConfigs() {Map<String, Object> props = Maps.newHashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

3.编写消费者的配置:kafkaConsumerConfig

package com.example.demo.config;import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.Map;/*** @author xiaofeng* @version V1.0* @title: KafkaConfiguration.java* @package: com.example.demo.config* @description: kafka消费者配置* @date 2018/4/2 0002 下午 3:42*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = Maps.newHashMap();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.2.22:9092");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return propsMap;}@Beanpublic KafkaProperties.Listener listener() {return new KafkaProperties.Listener();}}

以上为kafka生产者和消费者的相关配置,至于各个配置选项我并没有做详细的解释,相关童鞋可以自行查询。

4.编写生产者

package com.example.demo.kafka.sender;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/*** @author xiaofeng* @version V1.0* @title: TestKafkaSender.java* @package: com.example.demo.kafka.sender* @description: kafka生产者* @date 2018/4/2 0002 下午 3:31*/
@Component
public class TestKafkaSender {@Autowiredprivate KafkaTemplate kafkaTemplate;@Value("${kafka.test.topic}")String testTopic;public void sendTest(String msg){kafkaTemplate.send(testTopic, msg);}
}

5.编写消费者

package com.example.demo.kafka.consumer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author xiaofeng* @version V1.0* @title: FollowHisOrderConsumer.java* @package: com.example.demo.kafka.consumer* @description: kafka消费者* @date 2018/4/2 0002 下午 3:31*/
@Component
public class TestKafkaConsumer {Logger logger = LoggerFactory.getLogger(getClass());@KafkaListener(topics = {"${kafka.test.topic}"})public void consumer(String message) {logger.info(" message = " + message);System.out.printf("message=" + message);}}

最后我们就可以开始测试了,编写单元测试类:

接下来自己去控制台查看消费者是否已经接收到了相关数据吧!(不知道你有没有收到,反正我是收到了^_^)

springboot + kafka相关推荐

  1. springboot+kafka(ip地址瞎写的)

    2019独角兽企业重金招聘Python工程师标准>>> 1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用.先说一下,很简单,,, ...

  2. kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集

    一.背景 随着业务复杂度的提升以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障我们也会将重要的服务模块进行集群部署,通过负载均衡进行服务的调用.那么随着节点的增多,各个 ...

  3. SpringBoot+Kafka+ELK 完成海量日志收集(超详细)

    点击关注公众号,实用技术文章及时了解 来源:jiandansuifeng.blog.csdn.net/ article/details/107361190 整体流程大概如下: 服务器准备 在这先列出各 ...

  4. springboot kafka发送消息

    场景:kafka发送消息,并且根据消息发送的不同渠道和消息类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理 1.引入依赖 <dependency><groupId ...

  5. @KafkaListener 学习 消息监听 Springboot+kafka

    文章内容如有错误or需要完善,还请指正,非常感谢! 应用场景: 监听设备转发到kafka的队列,队列命名方式为 设备类型+一种业务类型+一台设备. @KafkaListener(id = " ...

  6. springboot kafka 怎样关闭kafka的在控制台打印的日志

    问题: kafka自带的这些日志很多,很烦,有时候你想看看你其他的日志都被刷掉了! 分析: 看见图片红框框柱的地方了吗,就是这小子打的日志. 解决: 直接在你的日志配置文件里面加上这句话 <lo ...

  7. SpringBoot实战(十四)之整合KafKa

    本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题. 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章.希望能够给大家帮助,少走一些弯路. ...

  8. springboot整合kafka_springboot整合kafka实现消息的发送消费

    如下是springboot整合kafka的一个案例,方便需要的小伙伴. 启动kafka Server cd 到kafka的bin目录下:前提是启动zk./kafka-server-start.sh / ...

  9. Springboot与Kafka的小插曲

    目录 1.背景 2.环境 3.应用 1)pom.xml 2)application.yml 3)main方法 4)Entity 5)ServiceImpl 6)controller 4.遇到的问题 5 ...

最新文章

  1. android 加载器loader详解
  2. [转]Laravel 4之URL生成
  3. 引用 看下面图片是向左转还是向右转呢?
  4. 安卓自定义时间选择器_微信小程序拾色器(颜色选择器)组件
  5. python笔记30-docstring注释添加变量
  6. python对象属性赋值_关于python对象 中dict属性赋值的疑问
  7. html 冒泡事件拦截,Js 冒泡事件阻止
  8. JDBC之用元数据将结果集封装为List对象
  9. redis aof命令缓冲区的写入源码
  10. es6 什么是async函数
  11. Python学习之==线程进程
  12. 为什么用共有属性来封装私有变量
  13. OSPF和ISIS协议的异同
  14. 高端化自欺欺人,国产手机又一块遮羞布被扯下,事实是被苹果碾压
  15. Prometheus监控告警
  16. 关键点检测方法、人体姿态估计
  17. app第三方登录利弊
  18. 14个适合后台管理系统快速开发的前端框架
  19. 【机器学习】图像语义分割常用指标Dice系数 敏感性 特异性 IOU及python代码实现
  20. DPDK 入门最佳指南

热门文章

  1. win7开机提示服务器正在运行,科技常识:win7电脑启动ie浏览器提示服务器正在运行的处理方法...
  2. c linux time微秒_qt linux系统获取当前时间(精确到毫秒、微秒)-Go语言中文社区...
  3. python进程监控及恢复
  4. Backbone.js源码解读(转载)
  5. github注册之后更新教程
  6. json格式数据,将数据库中查询的结果转换为json, 然后调用接口的方式返回json(方式一)...
  7. MYSQL--事务处理
  8. 备忘:phalcon的坑
  9. [Silverlight]奇技银巧系列-3
  10. Asp.net MVC 示例项目Suteki.Shop分析之---ViewData