十五、Spring cloud 消息总线(Bus)
一、回顾 Spring 事件/监听器
- Spring 事件
- ApplicationEvent
- Spring 监听器
- ApplicationListener/@EventListener
- Spring 事件发布器
- ApplicationEventPublisher
Demo:
/*** Spring 事件/监听器 Demo* @author 咸鱼* @date 2018/11/29 18:58*/
public class SpringEventDemo {public static void main(String[] args) {//创建 Annotation 驱动 Spring 应用上下文AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();//注册 EventConfiguration 到 Spring 应用上下文context.register(EventConfiguration.class);//启动 Spring 应用上下文context.refresh();//AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现ApplicationEventPublisher publisher = context;//发布一个publisher.publishEvent(new MyApplicationEvent("hello java"));}/*** 自定义事件*/private static class MyApplicationEvent extends ApplicationEvent{private static final long serialVersionUID = -5938169569156916456L;public MyApplicationEvent(String message) {super(message);}}/*** 监听事件*/@Configurationpublic static class EventConfiguration{/*** 监听 {@link MyApplicationEvent}* @param event*/@EventListenerpublic void onEvent(MyApplicationEvent event){System.out.println("监听到事件:" + event);}}
}
二、Spring Cloud Bus
(一)使用场景
用于广播应用状态变更到分布式系统中的各个关联的节点。应用节点不直接相互通讯,而通过消息总线来实现通知。
简单点说:比如有一个配置服务器
,有多个配置客户端
。以前没用消息总线之前,一旦配置服务器
的配置项发生改变,那么需要由每个配置客户端
调用 POST请求/actuator/refresh
才能刷新本地配置项。这样带来的问题是,一旦系统越来越大,那么若要改变配置项,则需要大量的配置客户端
手动刷新本地配置项。而引入了消息总线
以后,则由消息总线
来通知各个客户端配置项发生改变了,并触发刷新本地配置项操作。
(二)架构:
架构解析:
客户端发送 POST请求 /actuator/bus-refresh/${contextId}:*
,消息总线会根据请求生成一个事件(E1),并将该事件发送给消息中间件(比如 Kafka)。此时会有两种情况:单点传播 和 集群传播。这主要适用于同样的应用会有多个同样的实例(这些实例靠端口进行区分,但 serviceId 都是一样的)。单点传播就是只向其中的一个实例传播,集群传播向所有的实例传播。
在消息中间件在接收消息时,所有的应用同时也在监听这些事件(比如 E1)。在监听到 E1 事件以后,根据上面的规则,消息总线会将 E1 事件包装成不同类型的内部事件(比如 RefreshRemoteApplicationEvent
)。然后让目的应用的 EventPublisher 将包装后的事件发布给事件监听器队列,等待下一步的处理。
暂时这么理解,后续如果不对,再进行改正。
(三)默认实现
- AMQP(Rabbit MQ)
- Kafka
现阶段,Spring Cloud Bus 只支持 AMQP(Rabbit MQ) 和 Kafka两个消息中间件。
三、案例(使用 Kafka)
激活总线:
AMQP:spring-cloud-starter-bus-amqp
Kafka:spring-cloud-starter-bus-kafka
spring-cloud-bus
改造 user-service-client:使用 Kafka 整合 Spring Cloud Bus
(一)增加依赖
<!-- 整合 Spring Cloud Bus:Kafka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency>
(二)总线事件传播
1、事件传播类型
- 单点传播
- Endpoint:
/actuator/bus-refresh/${applicationContextId}:*
(POST请求) - 案例:
localhost:8080/actuator/bus-refresh/user-service-client:8080
- Endpoint:
- 集群传播
- Endpoint:
/actuator/bus-refresh/${applicationContextId}:**
(POST请求) - 案例:
localhost:8080/actuator/bus-refresh/user-service-client:**
- Endpoint:
备注1:
${applicationContextId}:*
一般是serviceId:port
,而serviceId
就是在application.properties
中配置的spring.application.name=user-service-client
属性。
备注2:消息总线提供的端点
/actuator/bus-refresh/${applicationContextId}:*
和 Actuator 自带的端点/actuator/refresh
作用是相同的,都是刷新配置项,区别主要在于:
/actuator/refresh
:刷新本地配置项
/actuator/bus-refresh/${applicationContextId}:*
:刷新远程应用配置项
问题:如何定位 Application Context ID?
通过访问 /actuator/beans
确认当前 Application Context ID(PS:2.0版本的找不着。。。)
2、事件传播监听器
(1)通过日志可知 单点传播/集群传播 监听器均为org.springframework.cloud.bus.event.RefreshListener
:
public class RefreshListenerimplements ApplicationListener<RefreshRemoteApplicationEvent> {private static Log log = LogFactory.getLog(RefreshListener.class);private ContextRefresher contextRefresher;public RefreshListener(ContextRefresher contextRefresher) {this.contextRefresher = contextRefresher;}@Overridepublic void onApplicationEvent(RefreshRemoteApplicationEvent event) {Set<String> keys = contextRefresher.refresh();log.info("Received remote refresh request. Keys refreshed " + keys);}
}
RefreshListener
监听事件 RefreshRemoteApplicationEvent
(2)自定义 RefreshRemoteApplicationEvent
事件监听器,监听 总线事件传播
。
/*** @author 咸鱼* @date 2018/11/29 20:44*/
@Configuration
public class BusConfiguration {@EventListenerpublic void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){System.out.printf("Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}
(三)总线事件跟踪
1、端点:/trace
默认事件跟踪功能是失效的,需要通过配置项激活:spring.cloud.bus.trace.enabled=true
2、总线内部事件
EnvironmentChangeRemoteApplicationEvent
RefreshRemoteApplicationEvent
AckRemoteApplicationEvent
EnvironmentChangeRemoteApplicationEvent
:应用环境变量(env)改变触发该事件,比如执行:POST请求/actuator/bus-env
RefreshRemoteApplicationEvent
:刷新配置项,触发该事件,比如执行:POST请求/actuator/bus-refresh
3、自定义事件监听器
我们可以自定义监听器,来监听这两个事件的发生,可以相应的做一些处理,比如:
@Configuration
public class BusConfiguration {/*** 监听 RefreshRemoteApplicationEvent 事件* POST请求 `/actuator/bus-env` 触发*/@EventListenerpublic void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){System.out.printf("RefreshRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}/*** 监听 EnvironmentChangeRemoteApplicationEvent 事件* POST请求 `/actuator/bus-refresh` 触发*/@EventListenerpublic void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event){System.out.printf("EnvironmentChangeRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}
(四)自定义事件
- 事件扩展
- RemoteApplicationEvent
- 事件扫描
- @RemoteApplicationEventScan
- 事件序列化
1、扩展 RemoteApplicationEvent
/*** 自定义事件 {@link RemoteApplicationEvent}* @author 咸鱼* @date 2018/12/1 10:11*/
public class UserRemoteApplicationEvent extends RemoteApplicationEvent {private static final long serialVersionUID = -1624266233141574546L;/*** 这个默认构造函数必须有,否则无法将 originService 传递到 目标应用中*/private UserRemoteApplicationEvent() {}public UserRemoteApplicationEvent(User user, String originService,String destinationService) {super(user, originService, destinationService);}
}
2、添加 @RemoteApplicationEventScan
/*** 注解 @RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class):* 扫面自定义事件*/
@RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class)
@Configuration
public class BusConfiguration {/*** 监听自定义的 UserRemoteApplicationEvent 事件*/@EventListenerpublic void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}
3、发布 RemoteApplicationEvent
/*** Bus 事件 Controller* @author 咸鱼* @date 2018/12/1 10:09*/
@RestController
public class BusApplicationEventController implements ApplicationContextAware,ApplicationEventPublisherAware{/*** 事件发布器(通过实现 ApplicationEventPublisherAware 实现自动装载)* 补充: AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现*/private ApplicationEventPublisher eventPublisher;/*** 应用上下文(通过实现 ApplicationContextAware 实现自动装载)*/private ApplicationContext applicationContext;@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.eventPublisher = applicationEventPublisher;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}/*** 问题:为什么这里发布的的自定义事件,可以被框架监听到?* 因为在 BusAutoConfiguration#acceptLocal() 中,注册了下面的监听器:@EventListener(classes = RemoteApplicationEvent.class)public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)&& !(event instanceof AckRemoteApplicationEvent)) {this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());}}* 而我们自定义的事件 UserRemoteApplicationEvent 是 RemoteApplicationEvent 的子类,所以我们在* 发布自定义事件以后,可以被框架监听到。*/@PostMapping("/bus/event/publish/user")public boolean publishUserEvent(@RequestBody User user,@RequestParam(value = "destination", required = false) String destination) {//获取应用idString serviceInstanceId = applicationContext.getId();//新建 自定义事件 对象UserRemoteApplicationEvent event = new UserRemoteApplicationEvent(user, serviceInstanceId, destination);//发布事件eventPublisher.publishEvent(event);return true;}
}
4、监听 RemoteApplicationEvent
/*** 监听自定义的 UserRemoteApplicationEvent 事件*/@EventListenerpublic void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
四、源码分析
(一)BusAutoConfiguration
1、监听 Spring Event
(本地事件)
@EventListener(classes = RemoteApplicationEvent.class)public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)&& !(event instanceof AckRemoteApplicationEvent)) {this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());}}
由于 @EventListener
监听 Spring Event
,所以事件 RemoteApplicationEvent
属于本地事件,因必然有发布该事件的源头。
2、监听 Stream 事件(远程事件)
@StreamListener(SpringCloudBusClient.INPUT)public void acceptRemote(RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)&& this.applicationEventPublisher != null) {this.applicationEventPublisher.publishEvent(event);}// If it's an ACK we are finished processing at this pointreturn;}if (this.serviceMatcher.isForSelf(event)&& this.applicationEventPublisher != null) {if (!this.serviceMatcher.isFromSelf(event)) {this.applicationEventPublisher.publishEvent(event);}if (this.bus.getAck().isEnabled()) {AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,this.serviceMatcher.getServiceId(),this.bus.getAck().getDestinationService(),event.getDestinationService(), event.getId(), event.getClass());this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());this.applicationEventPublisher.publishEvent(ack);}}if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {// We are set to register sent events so publish it for local consumption,// irrespective of the originthis.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,event.getOriginService(), event.getDestinationService(),event.getId(), event.getClass()));}}
acceptRemote() 监听 Stream 事件,同时发送 Spring Event(本地事件)
ServiceMatcher#isForSelf()
用于匹配 RemoteApplicationEvent
是否为当前应用实例而来。
this.serviceMatcher.isForSelf(event)
ServiceMatcher#isFromSelf()
用于判断当前事件是否为自己发送。
this.serviceMatcher.isFromSelf(event)
3、整体流程
假设 user-service-client:8080
执行 /actuator/bus-refresh
端口,发送一个 RefreshRemoteApplicationEvent
事件:
curl -X POST http://localhost:8080/bus-refresh/user-service-client:8080
- user-service-client:8080:Bus事件的发布者、监听者
- user-service-client:8081:Bus事件的监听者
- user-service-client:8080:Bus事件的监听者
当 Stream Binder 接收到发布者 RefreshRemoteApplicationEvent
事件,广播该事件到所有的监听者:
user-service-client:8080
:判断事件不是为自己发送,发布SentApplicationEvent
事件(主要发布到/trace
中)user-service-client:8081
:判断事件不是为自己发送,发布SentApplicationEvent
事件(主要发布到/trace
中)user-service-client:8082
:判断事件是为自己发送,执行RefreshRemoteApplicationEvent
事件监听。如果ack
激活的(默认激活),cloudBusOutboundChannel
会发送AckRemoteApplicationEvent
到管道里。可以由以下代码监听:
/*** 监听 AckRemoteApplicationEvent 事件*/@StreamListener(SpringCloudBusClient.OUTPUT)public void onAckRemoteApplicationEvent(AckRemoteApplicationEvent event) {System.out.printf("AckRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
十五、Spring cloud 消息总线(Bus)相关推荐
- SpringCloud教程-消息总线Bus 客户端(client)刷新(SpringCloud版本Greenwich.SR4)
文章目录 消息总线(Bus)介绍 项目示例 config-client-bus 代码地址:github-spring-cloud地址 前言:前面文章讲了Spring Cloud Config配置中心如 ...
- eureka集群只注册一个_Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇
Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇 本文主要内容: 1:spring cloud整合Eureka总结 本文是由凯哥(凯哥Java:kagejava ...
- SpringCloud config 配置中心集群配置以及整合消息总线BUS实现关联微服务配置自动刷新
一.SpringCloud Config 基本配置中的问题 在上一章节<SpringCloud config 配置中心介绍与基本配置使用>中我们现实了配置中心的配置集中管理.调用微服务应用 ...
- java技术--SpringCloud:消息总线Bus简介及代码实现(18)
1.消息总线Bus简介 (1)消息总线Bus的作用<1>在没有使用消息总线的时候,如果需要修改某个配置1.1.如果涉及修改的微服务节点比较多,需要手动的逐个节点的刷新非常麻烦1.2.在微服 ...
- 499、Java分布式和集群12 -【SpringCloud视图微服务 - 消息总线Bus】 2021.06.01
目录 0.RabbitMQ 1.先运行,看到效果,再学习 2.pom.xml 3.bootstrap.yml 4.application.yml 5.ProductDataServiceApplica ...
- Spring Cloud消息驱动整合 1
Spring Cloud Stream 使用场景 消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念 1.应用模型 2.Binder抽象 3.持久化 发布/订阅支持 4.消费分组支持 5.分区支 ...
- iframe的src动态修改并刷新_微服务中配置中心Config+消息总线Bus,实现分布式自动刷新配置
技术/杨33 一.分布式配置中心Config 一套集中的.动态的配置管理,实现统一配置微服务中的每个子服务. Spring Cloud Config为微服务架构提供了集中化的外部配置支持,配置服务器为 ...
- SpringCloud教程-消息总线Bus 服务端(server)刷新(SpringCloud版本Greenwich.SR4)
文章目录 项目示例 config-server-bug 代码地址:github-spring-cloud地址 前言:本篇文章在上一篇文章基础上进行修改,因为虽然我们做到了利用一个消息总线触发刷新,而刷 ...
- SpringCloud消息总线——Bus
Bus 本专栏学习内容来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 在SpringCloud Config学习过程中,还遗留下来一个问题:当运维更新git上的配置信息时,要想更改所有的 ...
- 黑马十次方项目day08-11 消息总线组件SpringCloudBus
文章目录 一.SpringCloudBus简介 二. SpringCloudBus 代码的实现 2.1 配置服务端 2.2 配置客户端 一.SpringCloudBus简介 如果我们更新码云中的配置文 ...
最新文章
- linux环境内存分配原理
- 非对称加密算法RSA公钥私钥的模数和指数提取方法
- RHEL6.3配置Apache服务器(2) 构建虚拟主机
- gedit乱码 fedora
- SAP用户信息查询的几张表
- DataTable的Merge\COPY\AcceptChange使用说明
- Ruby中的Profiling工具
- 马斯克:挑战纽北赛道的Model S配有7个座椅
- python学习笔记之property
- python的浮点数_Python中整数和浮点数
- Tensorflow卷积神经网络识别MINST手写数字
- tkinter 中给某个文本加上滚动条_Python Tkinter自制文本编辑器
- 做人要有一颗健康的心
- 写写做数模竞赛的经验
- 【毕业设计】基于单片机的太空游戏机 - 嵌入式 物联网 stm32 51
- c语言图像对比度增强,图像对比度应用
- Detectron2安装教程
- 电脑如何双开两个微信
- 输入时(hour)、分(minute)、秒(second)的一个具体时间,要求打印出它的下一秒出来(一天24小时)。 例如输入的是23时59分59秒,则输出00:00:00
- c语言直接输出PDF,C语言格式化输出总结.pdf
热门文章
- 体系结构学习7-乱序执行
- 月之数 HDU2502
- The APR based Apache Tomcat Native library which allows optimal performance in production environmen
- 校园招聘-2017携程秋招后台开发笔试编程题
- android粘性广播何时结束,Android之粘性广播理解
- React的箭头函数详解
- android如何替换contact的来电铃声
- The server cannot or will not process the request due to something that is perceived to be a client.
- 【Linux】排查进程、挖矿病毒查找
- 基于RiskPariyBlackLitterman的因子择时