SpringCloud Stream 整合kafka
一、引入依赖包
org.springframework.cloud
spring-cloud-stream
org.springframework.cloud
spring-cloud-stream-binder-kafka
二、自定义信息通道
官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
如下我们自定义信息通道EsChannel
/**
自定义信息通道
@author dbq
@date 2019/9/26 14:54
/
public interface EsChannel {
/*- 缺省发送消息通道名称
*/
String ES_DEFAULT_OUTPUT = “es_default_output”;
/**
- 缺省接收消息通道名称
*/
String ES_DEFAULT_INPUT = “es_default_input”;
/**
- 告警发送消息通道名称
*/
String ES_ALARM_OUTPUT = “es_alarm_output”;
/**
- 告警接收消息通道名称
*/
String ES_ALARM_INPUT = “es_alarm_input”;
/**
- 缺省发送消息通道
- @return channel 返回缺省信息发送通道
*/
@Output(ES_DEFAULT_OUTPUT)
MessageChannel sendEsDefaultMessage();
/**
- 告警发送消息通道
- @return channel 返回告警信息发送通道
*/
@Output(ES_ALARM_OUTPUT)
MessageChannel sendEsAlarmMessage();
/**
- 缺省接收消息通道
- @return channel 返回缺省信息接收通道
*/
@Input(ES_DEFAULT_INPUT)
MessageChannel recieveEsDefaultMessage();
/**
- 告警接收消息通道
- @return channel 返回告警信息接收通道
*/
@Input(ES_ALARM_INPUT)
MessageChannel recieveEsAlarmMessage();
}
- 缺省发送消息通道名称
三、@EnableBinding使应用程序连接到消息代理
@EnableDiscoveryClient
@SpringBootApplication
@EnableFeignClients
@EnableHystrix
@MapperScan(basePackages = “com.es.mapper”)
@EnableBinding(EsChannel.class)
public class EsOnenetApplication {
public static void main(String[] args) {SpringApplication.run(EsOnenetApplication.class, args);
}
}
四、SpringCloudStream及kafka配置
#==============================================================
#spring-cloud-stream-Kafka配置 开始
#==============================================================
#是否开启kafka(非spring-cloud-stream配置)
spring.kafka.enabled=false
#缺省的输入、输出通道
spring.cloud.stream.bindings.es_default_input.destination=es_default_topic
spring.cloud.stream.bindings.es_default_input.binder=kafka
spring.cloud.stream.bindings.es_default_input.group=es_default_group
spring.cloud.stream.bindings.es_default_output.destination=es_default_topic
spring.cloud.stream.bindings.es_default_output.binder=kafka
#入站消费者的并发性
spring.cloud.stream.bindings.es_default_input.consumer.concurrency=2
#告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
spring.cloud.stream.bindings.es_alarm_input.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_input.binder=kafka
spring.cloud.stream.bindings.es_alarm_input.group=es_alarm_group
spring.cloud.stream.bindings.es_alarm_output.destination=es_alarm_topic
spring.cloud.stream.bindings.es_alarm_output.binder=kafka
#kafka配置
spring.cloud.stream.kafka.binder.brokers=172...6:9092,172...7:9092,172...8:9092
spring.cloud.stream.kafka.binder.zkNodes=172...6:2181,172...7:2181,172...8:2181
spring.cloud.stream.kafka.binder.requiredAcks=1
#==============================================================
#spring-cloud-stream-Kafka配置 结束
#==============================================================
五、发送消息到输出通道
/**
kafka消息发送器
@author dbq
@date 2019/9/26 17:50
*/
@Component
public class EsKafkaMessageSender {
@Autowired
private EsChannel channel;/**
- 消息发送到默认通道:缺省通道对应缺省主题
- @param message
*/
public void sendToDefaultChannel(String message){
channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
}
/**
- 消息发送到告警通道:告警通道对应告警主题
- @param message
*/
public void sendToAlarmChannel(String message){
channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
}
}
注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。
六、从输入通道订阅消息
@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
/*** 从缺省通道接收消息* @param message*/
@StreamListener(EsChannel.ES_DEFAULT_INPUT)
public void receive(Message<String> message){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
}/*** 从告警通道接收消息* @param message*/
@StreamListener(EsChannel.ES_ALARM_INPUT)
public void receiveAlarm(Message<String> message){System.out.println("订阅告警消息:" + message);
}
}
从不同的通道实现消息的订阅。
七、这样完整的消息系统就搭建好了,定义Controller发送消息测试
@ApiOperation(value = “test1”, httpMethod = “POST”)
@PostMapping(value = “/test1”, produces = “application/json;charset=UTF-8”)
public void test1(String message, HttpServletRequest request,
HttpServletResponse response) {
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
sender.sendToDefaultChannel(message);
}
@ApiOperation(value = "test", httpMethod = "POST")
@PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
public void test2(String message, HttpServletRequest request,HttpServletResponse response) {sender.sendToAlarmChannel(message);
}
SpringCloud Stream 整合kafka相关推荐
- 记一次springcloud stream延迟消息失效
起因 实现一个封号功能,可以封1,3,7,30,永久的不同天数,其中1,3,7,30是通过springcloud stream整合rabbitmq的延迟队列来实现,到期自动解封. 问题 当时间设置30 ...
- springcloud 相同服务名_SpringCloud系列之SpringCloud Stream
SpringCloud Stream 技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换. 屏蔽底层消息中间件的差异, ...
- 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目录 整合 Kafka 说明 Kafka特定配置 KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 Kafka ...
- 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 API 注意 代码实现-自动提交偏移量到默认主题 代码实现- ...
- active mq topic消费后删除_Spring cloud stream 整合mq
说明:本案例win10环境 测试scs(spring cloud stream)整合kfk(kafka)/rbt(rabbitmq)消息生产消费场景 流程 一.准备中间件环境(kfk/rbt) 后续内 ...
- 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ
系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...
- 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...
- 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...
- kafka maven 依赖_SpringBoot入门建站全系列(二十八)整合Kafka做日志监控
SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...
- SpringBoot入门建站全系列(二十八)整合Kafka做日志监控
SpringBoot入门建站全系列(二十八)整合Kafka做日志监控 一.概述 Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端 ...
最新文章
- c语言逆序输出字符串指针,菜鸟求助-如何用指针法将一串字符按单词的倒序输出?如:i love yo...
- MIT联合波士顿咨询:全球21个行业,对话3000名高管,AI如何重塑商业形态? | 雷报
- 利用jsoncpp将json字符串转换为Vector
- 【计算机网络】计算机网络 OSI 参考模型 ( 计算机网络分层结构 | OSI 七层参考模型 | 应用层 | 表示层 | 会话层 | 传输层 | 网络层 | 数据链路层 | 物理层 )
- 千兆网综合布线系统的线缆选型
- NSA的各种***工具
- 图片标注工具labeling的安装和使用
- ARKit入门到精通-1.0 -基础内容-史小川-专题视频课程
- 【工作日报】2019年7月 前端开发工作日报汇总
- PDF如何旋转页面,PDF旋转页面的操作方法
- 国际贸易术语_Incoterm
- 2021牛客暑期多校训练营5 Cheating and Stealing
- Shamir密钥分享算法简析
- android 模拟器终端 了解及部分命令
- directx是什么?
- Python如何输出当前时间,时分秒,以及ms
- jq设置保留两位小数_js保留两位小数方法总结
- VS2008的安装注意事项
- GAN-GP(Gradient Penalty)
- Ask, acquire, and attack: data-free UAP generation using class impressions