一、回顾 Spring 事件/监听器

  • Spring 事件

    • ApplicationEvent
  • Spring 监听器
    • ApplicationListener/@EventListener
  • Spring 事件发布器
    • ApplicationEventPublisher

Demo:

/*** Spring 事件/监听器 Demo* @author 咸鱼* @date 2018/11/29 18:58*/
public class SpringEventDemo {public static void main(String[] args) {//创建 Annotation 驱动 Spring 应用上下文AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();//注册 EventConfiguration 到 Spring 应用上下文context.register(EventConfiguration.class);//启动 Spring 应用上下文context.refresh();//AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现ApplicationEventPublisher publisher = context;//发布一个publisher.publishEvent(new MyApplicationEvent("hello java"));}/*** 自定义事件*/private static class  MyApplicationEvent extends ApplicationEvent{private static final long serialVersionUID = -5938169569156916456L;public MyApplicationEvent(String message) {super(message);}}/*** 监听事件*/@Configurationpublic static class EventConfiguration{/*** 监听 {@link MyApplicationEvent}* @param event*/@EventListenerpublic void onEvent(MyApplicationEvent event){System.out.println("监听到事件:" + event);}}
}

二、Spring Cloud Bus

(一)使用场景
  用于广播应用状态变更到分布式系统中的各个关联的节点。应用节点不直接相互通讯,而通过消息总线来实现通知。

  简单点说:比如有一个配置服务器,有多个配置客户端。以前没用消息总线之前,一旦配置服务器的配置项发生改变,那么需要由每个配置客户端调用 POST请求/actuator/refresh 才能刷新本地配置项。这样带来的问题是,一旦系统越来越大,那么若要改变配置项,则需要大量的配置客户端手动刷新本地配置项。而引入了消息总线以后,则由消息总线来通知各个客户端配置项发生改变了,并触发刷新本地配置项操作。

(二)架构:

  架构解析:
  客户端发送 POST请求 /actuator/bus-refresh/${contextId}:*,消息总线会根据请求生成一个事件(E1),并将该事件发送给消息中间件(比如 Kafka)。此时会有两种情况:单点传播 和 集群传播。这主要适用于同样的应用会有多个同样的实例(这些实例靠端口进行区分,但 serviceId 都是一样的)。单点传播就是只向其中的一个实例传播,集群传播向所有的实例传播。
  在消息中间件在接收消息时,所有的应用同时也在监听这些事件(比如 E1)。在监听到 E1 事件以后,根据上面的规则,消息总线会将 E1 事件包装成不同类型的内部事件(比如 RefreshRemoteApplicationEvent)。然后让目的应用的 EventPublisher 将包装后的事件发布给事件监听器队列,等待下一步的处理。

暂时这么理解,后续如果不对,再进行改正。

(三)默认实现

  • AMQP(Rabbit MQ)
  • Kafka

  现阶段,Spring Cloud Bus 只支持 AMQP(Rabbit MQ) 和 Kafka两个消息中间件。

三、案例(使用 Kafka)

激活总线:
AMQP:spring-cloud-starter-bus-amqp
Kafka:spring-cloud-starter-bus-kafka
spring-cloud-bus

  改造 user-service-client:使用 Kafka 整合 Spring Cloud Bus

(一)增加依赖

     <!-- 整合 Spring Cloud Bus:Kafka --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency>

(二)总线事件传播

1、事件传播类型

  • 单点传播

    • Endpoint:/actuator/bus-refresh/${applicationContextId}:*(POST请求)
    • 案例:localhost:8080/actuator/bus-refresh/user-service-client:8080
  • 集群传播
    • Endpoint:/actuator/bus-refresh/${applicationContextId}:**(POST请求)
    • 案例:localhost:8080/actuator/bus-refresh/user-service-client:**

  备注1:${applicationContextId}:* 一般是 serviceId:port,而 serviceId 就是在 application.properties 中配置的 spring.application.name=user-service-client 属性。

  备注2:消息总线提供的端点 /actuator/bus-refresh/${applicationContextId}:* 和 Actuator 自带的端点 /actuator/refresh 作用是相同的,都是刷新配置项,区别主要在于:
/actuator/refresh:刷新本地配置项
/actuator/bus-refresh/${applicationContextId}:*:刷新远程应用配置项

  问题:如何定位 Application Context ID?
  通过访问 /actuator/beans 确认当前 Application Context ID(PS:2.0版本的找不着。。。)

2、事件传播监听器

(1)通过日志可知 单点传播/集群传播 监听器均为org.springframework.cloud.bus.event.RefreshListener

public class RefreshListenerimplements ApplicationListener<RefreshRemoteApplicationEvent> {private static Log log = LogFactory.getLog(RefreshListener.class);private ContextRefresher contextRefresher;public RefreshListener(ContextRefresher contextRefresher) {this.contextRefresher = contextRefresher;}@Overridepublic void onApplicationEvent(RefreshRemoteApplicationEvent event) {Set<String> keys = contextRefresher.refresh();log.info("Received remote refresh request. Keys refreshed " + keys);}
}

  RefreshListener 监听事件 RefreshRemoteApplicationEvent

(2)自定义 RefreshRemoteApplicationEvent 事件监听器,监听 总线事件传播

/*** @author 咸鱼* @date 2018/11/29 20:44*/
@Configuration
public class BusConfiguration {@EventListenerpublic void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){System.out.printf("Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}

(三)总线事件跟踪

1、端点:/trace
  默认事件跟踪功能是失效的,需要通过配置项激活:spring.cloud.bus.trace.enabled=true

2、总线内部事件

  • EnvironmentChangeRemoteApplicationEvent
  • RefreshRemoteApplicationEvent
  • AckRemoteApplicationEvent

EnvironmentChangeRemoteApplicationEvent:应用环境变量(env)改变触发该事件,比如执行:POST请求 /actuator/bus-env

RefreshRemoteApplicationEvent:刷新配置项,触发该事件,比如执行:POST请求 /actuator/bus-refresh

3、自定义事件监听器

  我们可以自定义监听器,来监听这两个事件的发生,可以相应的做一些处理,比如:

@Configuration
public class BusConfiguration {/*** 监听 RefreshRemoteApplicationEvent 事件* POST请求 `/actuator/bus-env` 触发*/@EventListenerpublic void onRefreshRemoteApplicationEvent(RefreshRemoteApplicationEvent event){System.out.printf("RefreshRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}/*** 监听 EnvironmentChangeRemoteApplicationEvent 事件* POST请求 `/actuator/bus-refresh` 触发*/@EventListenerpublic void onEnvironmentChangeRemoteApplicationEvent(EnvironmentChangeRemoteApplicationEvent event){System.out.printf("EnvironmentChangeRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}

(四)自定义事件

  • 事件扩展

    • RemoteApplicationEvent
  • 事件扫描
    • @RemoteApplicationEventScan
  • 事件序列化

1、扩展 RemoteApplicationEvent

/*** 自定义事件 {@link RemoteApplicationEvent}* @author 咸鱼* @date 2018/12/1 10:11*/
public class UserRemoteApplicationEvent extends RemoteApplicationEvent {private static final long serialVersionUID = -1624266233141574546L;/*** 这个默认构造函数必须有,否则无法将 originService 传递到 目标应用中*/private UserRemoteApplicationEvent() {}public UserRemoteApplicationEvent(User user, String originService,String destinationService) {super(user, originService, destinationService);}
}

2、添加 @RemoteApplicationEventScan

/*** 注解 @RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class):* 扫面自定义事件*/
@RemoteApplicationEventScan(basePackageClasses = UserRemoteApplicationEvent.class)
@Configuration
public class BusConfiguration {/*** 监听自定义的 UserRemoteApplicationEvent 事件*/@EventListenerpublic void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}
}

3、发布 RemoteApplicationEvent

/*** Bus 事件 Controller* @author 咸鱼* @date 2018/12/1 10:09*/
@RestController
public class BusApplicationEventController implements ApplicationContextAware,ApplicationEventPublisherAware{/*** 事件发布器(通过实现 ApplicationEventPublisherAware 实现自动装载)* 补充: AnnotationConfigApplicationContext 是 ApplicationEventPublisher 的一种具体实现*/private ApplicationEventPublisher eventPublisher;/*** 应用上下文(通过实现 ApplicationContextAware 实现自动装载)*/private ApplicationContext applicationContext;@Overridepublic void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {this.eventPublisher = applicationEventPublisher;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}/*** 问题:为什么这里发布的的自定义事件,可以被框架监听到?*     因为在 BusAutoConfiguration#acceptLocal() 中,注册了下面的监听器:@EventListener(classes = RemoteApplicationEvent.class)public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)&& !(event instanceof AckRemoteApplicationEvent)) {this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());}}*     而我们自定义的事件 UserRemoteApplicationEvent 是 RemoteApplicationEvent 的子类,所以我们在* 发布自定义事件以后,可以被框架监听到。*/@PostMapping("/bus/event/publish/user")public boolean publishUserEvent(@RequestBody User user,@RequestParam(value = "destination", required = false) String destination) {//获取应用idString serviceInstanceId = applicationContext.getId();//新建 自定义事件 对象UserRemoteApplicationEvent event = new UserRemoteApplicationEvent(user, serviceInstanceId, destination);//发布事件eventPublisher.publishEvent(event);return true;}
}

4、监听 RemoteApplicationEvent

/*** 监听自定义的 UserRemoteApplicationEvent 事件*/@EventListenerpublic void onUserRemoteApplicationEvent(UserRemoteApplicationEvent event){System.out.printf("UserRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}

四、源码分析

(一)BusAutoConfiguration

1、监听 Spring Event(本地事件)

 @EventListener(classes = RemoteApplicationEvent.class)public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)&& !(event instanceof AckRemoteApplicationEvent)) {this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());}}

  由于 @EventListener 监听 Spring Event,所以事件 RemoteApplicationEvent 属于本地事件,因必然有发布该事件的源头。

2、监听 Stream 事件(远程事件)

@StreamListener(SpringCloudBusClient.INPUT)public void acceptRemote(RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)&& this.applicationEventPublisher != null) {this.applicationEventPublisher.publishEvent(event);}// If it's an ACK we are finished processing at this pointreturn;}if (this.serviceMatcher.isForSelf(event)&& this.applicationEventPublisher != null) {if (!this.serviceMatcher.isFromSelf(event)) {this.applicationEventPublisher.publishEvent(event);}if (this.bus.getAck().isEnabled()) {AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,this.serviceMatcher.getServiceId(),this.bus.getAck().getDestinationService(),event.getDestinationService(), event.getId(), event.getClass());this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());this.applicationEventPublisher.publishEvent(ack);}}if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {// We are set to register sent events so publish it for local consumption,// irrespective of the originthis.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,event.getOriginService(), event.getDestinationService(),event.getId(), event.getClass()));}}

acceptRemote() 监听 Stream 事件,同时发送 Spring Event(本地事件)

  ServiceMatcher#isForSelf() 用于匹配 RemoteApplicationEvent 是否为当前应用实例而来。

this.serviceMatcher.isForSelf(event)

  ServiceMatcher#isFromSelf() 用于判断当前事件是否为自己发送。

this.serviceMatcher.isFromSelf(event)

3、整体流程

  假设 user-service-client:8080 执行 /actuator/bus-refresh 端口,发送一个 RefreshRemoteApplicationEvent 事件:

curl -X POST http://localhost:8080/bus-refresh/user-service-client:8080
  • user-service-client:8080:Bus事件的发布者、监听者
  • user-service-client:8081:Bus事件的监听者
  • user-service-client:8080:Bus事件的监听者

  当 Stream Binder 接收到发布者 RefreshRemoteApplicationEvent 事件,广播该事件到所有的监听者:

  • user-service-client:8080:判断事件不是为自己发送,发布 SentApplicationEvent 事件(主要发布到 /trace 中)
  • user-service-client:8081:判断事件不是为自己发送,发布 SentApplicationEvent 事件(主要发布到 /trace 中)
  • user-service-client:8082:判断事件是为自己发送,执行 RefreshRemoteApplicationEvent 事件监听。如果 ack 激活的(默认激活),cloudBusOutboundChannel 会发送 AckRemoteApplicationEvent 到管道里。可以由以下代码监听:
 /*** 监听 AckRemoteApplicationEvent 事件*/@StreamListener(SpringCloudBusClient.OUTPUT)public void onAckRemoteApplicationEvent(AckRemoteApplicationEvent event) {System.out.printf("AckRemoteApplicationEvent - Source : %s , originService : %s, destinationService : %s\n",event.getSource(), event.getOriginService(), event.getDestinationService());}

十五、Spring cloud 消息总线(Bus)相关推荐

  1. SpringCloud教程-消息总线Bus 客户端(client)刷新(SpringCloud版本Greenwich.SR4)

    文章目录 消息总线(Bus)介绍 项目示例 config-client-bus 代码地址:github-spring-cloud地址 前言:前面文章讲了Spring Cloud Config配置中心如 ...

  2. eureka集群只注册一个_Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇

    Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇 本文主要内容: 1:spring cloud整合Eureka总结 本文是由凯哥(凯哥Java:kagejava ...

  3. SpringCloud config 配置中心集群配置以及整合消息总线BUS实现关联微服务配置自动刷新

    一.SpringCloud Config 基本配置中的问题 在上一章节<SpringCloud config 配置中心介绍与基本配置使用>中我们现实了配置中心的配置集中管理.调用微服务应用 ...

  4. java技术--SpringCloud:消息总线Bus简介及代码实现(18)

    1.消息总线Bus简介 (1)消息总线Bus的作用<1>在没有使用消息总线的时候,如果需要修改某个配置1.1.如果涉及修改的微服务节点比较多,需要手动的逐个节点的刷新非常麻烦1.2.在微服 ...

  5. 499、Java分布式和集群12 -【SpringCloud视图微服务 - 消息总线Bus】 2021.06.01

    目录 0.RabbitMQ 1.先运行,看到效果,再学习 2.pom.xml 3.bootstrap.yml 4.application.yml 5.ProductDataServiceApplica ...

  6. Spring Cloud消息驱动整合 1

    Spring Cloud Stream 使用场景 消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念 1.应用模型 2.Binder抽象 3.持久化 发布/订阅支持 4.消费分组支持 5.分区支 ...

  7. iframe的src动态修改并刷新_微服务中配置中心Config+消息总线Bus,实现分布式自动刷新配置

    技术/杨33 一.分布式配置中心Config 一套集中的.动态的配置管理,实现统一配置微服务中的每个子服务. Spring Cloud Config为微服务架构提供了集中化的外部配置支持,配置服务器为 ...

  8. SpringCloud教程-消息总线Bus 服务端(server)刷新(SpringCloud版本Greenwich.SR4)

    文章目录 项目示例 config-server-bug 代码地址:github-spring-cloud地址 前言:本篇文章在上一篇文章基础上进行修改,因为虽然我们做到了利用一个消息总线触发刷新,而刷 ...

  9. SpringCloud消息总线——Bus

    Bus 本专栏学习内容来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 在SpringCloud Config学习过程中,还遗留下来一个问题:当运维更新git上的配置信息时,要想更改所有的 ...

  10. 黑马十次方项目day08-11 消息总线组件SpringCloudBus

    文章目录 一.SpringCloudBus简介 二. SpringCloudBus 代码的实现 2.1 配置服务端 2.2 配置客户端 一.SpringCloudBus简介 如果我们更新码云中的配置文 ...

最新文章

  1. linux环境内存分配原理
  2. 非对称加密算法RSA公钥私钥的模数和指数提取方法
  3. RHEL6.3配置Apache服务器(2) 构建虚拟主机
  4. gedit乱码 fedora
  5. SAP用户信息查询的几张表
  6. DataTable的Merge\COPY\AcceptChange使用说明
  7. Ruby中的Profiling工具
  8. 马斯克:挑战纽北赛道的Model S配有7个座椅
  9. python学习笔记之property
  10. python的浮点数_Python中整数和浮点数
  11. Tensorflow卷积神经网络识别MINST手写数字
  12. tkinter 中给某个文本加上滚动条_Python Tkinter自制文本编辑器
  13. 做人要有一颗健康的心
  14. 写写做数模竞赛的经验
  15. 【毕业设计】基于单片机的太空游戏机 - 嵌入式 物联网 stm32 51
  16. c语言图像对比度增强,图像对比度应用
  17. Detectron2安装教程
  18. 电脑如何双开两个微信
  19. 输入时(hour)、分(minute)、秒(second)的一个具体时间,要求打印出它的下一秒出来(一天24小时)。 例如输入的是23时59分59秒,则输出00:00:00
  20. c语言直接输出PDF,C语言格式化输出总结.pdf

热门文章

  1. 体系结构学习7-乱序执行
  2. 月之数 HDU2502
  3. The APR based Apache Tomcat Native library which allows optimal performance in production environmen
  4. 校园招聘-2017携程秋招后台开发笔试编程题
  5. android粘性广播何时结束,Android之粘性广播理解
  6. React的箭头函数详解
  7. android如何替换contact的来电铃声
  8. The server cannot or will not process the request due to something that is perceived to be a client.
  9. 【Linux】排查进程、挖矿病毒查找
  10. 基于RiskPariyBlackLitterman的因子择时