Spring Cloud 系列

  • eureka之服务治理完整过程搭建
  • eureka之高可用的注册中心
  • eureka之详解
  • ribbon之客户端负载均衡
  • ribbon之配置详解
  • Hystrix之服务容错保护
  • Hystrix之使用详解
  • Hystrix之请求合并
  • Hystrix之仪表盘以及Turbine集群监控
  • Feign之声明式服务调用
  • Consul之服务发现和配置管理
  • Sleuth之分布式服务跟踪
  • Zuul之API网关服务
  • Config之分布式配置中心
  • Config之服务端详解
  • Config之客户端详解
  • Bus之消息总线
  • Stream之消息驱动的微服务
  • Stream之绑定器详解
  • 重试机制
  • 系列源码地址


文章目录

  • 1. Spring 数据集成之旅
  • 2. Spring Cloud Stream 入门
  • 3. 主要思想
    • 3.1 应用程序模型
    • 3.2 绑定器抽象
    • 3.3 持久化的发布-订阅支持
    • 3.4 消费组支持
    • 3.5 分区支持
  • 4. 编程模型
    • 4.1 Destination Binders
    • 4.2 Destination Bindings
      • 4.2.1 自定义 Binding 实现
    • 4.3 生产和消费 Message
      • 4.3.1 Spring Integration Support
      • 4.3.2 使用 @StreamListener 注解
      • 4.3.3 使用 @StreamListener 进行基于内容的路由
      • 4.3.4 使用轮询消费者
    • 4.4 错误处理

写在前面

该文参考来自 程序猿DD 的Spring Cloud 微服务实战一书,该文是作为阅读了 spring cloud stream 一章的读书笔记。书中版本比较老,我选择了最新稳定版的 spring cloud Greenwich.SR2 版本,该版本较书中版本有些变动。非常感谢作者提供了这么好的学习思路,谢谢!文章也参考了 Spring-cloud-stream 的官方文档。

1. Spring 数据集成之旅

Spring 的数据集成之旅始于 Spring Integration 。通过其编程模型,它为开发人员构建应用程序提供了一致的体验,这些应用程序可以采用企业集成模式来连接外部系统,如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得非常突出。Spring Boot改变了开发人员构建应用程序的方式。通过Spring的编程模型和Spring Boot处理的运行时职责,可以无缝地开发独立的、生产级的基于Spring的微服务。

为了将其扩展到数据集成工作负载,Spring Integration和Spring Boot被放在一个新项目中。Spring Cloud Stream诞生了。

2. Spring Cloud Stream 入门

Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream构建于Spring Boot之上,以创建独立的、生产级的Spring应用程序,并使用Spring Integration提供到消息代理的连接。它提供了来自几个供应商的中间件配置,引入了持久发布-订阅语义、组和分区的概念。

Cloud微服务的生态系统中,Spring Cloud Stream 主要用来为微服务应用提供消息驱动的能力。它为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且针对微服务的复杂应用场景,引入了其它核心概念。屏蔽不同消息中间件的细节,这无疑是一个非常棒的想法!

  1. 导入依赖:

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
    
  2. @EnableBinding注释添加到应用程序中,以获得与消息代理的即时连接,将@StreamListener添加到方法中,以使其接收用于流处理的事件:

    @EnableBinding(Sink.class)
    public class SinkReceiver {private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);@StreamListener(Sink.INPUT)public void receive(Object payload) {logger.info("Received: " + payload);}}
    
  3. 启动类:

    @SpringBootApplication
    public class Application {public static void main(String[] args) {new SpringApplicationBuilder(Application.class).web(WebApplicationType.SERVLET).run(args);}
    }
    
  4. 配置文件:

    server:port: 60030
    

启动类和配置文件件并没有特殊之处。启动程序,观察 rabbit Mq 控制台 (http://localhost:15672);

我们能够在控制台上观察到程序建立的连接,并且也能发现其作为消费者订阅了 input.anonymous.... 开头的队列。我们也能在应用程序启动的时候,通过打印日志发现这些信息。现在,让我们之间通过RbbitMQ控制台,向该队列发送消息,观察程序的输出吧!

3. 主要思想

Spring Cloud Stream提供了大量的抽象和原语,简化了消息驱动的微服务应用程序的编写。

3.1 应用程序模型

Spring Cloud Stream 程序由一个与中间件无关的核心组成。应用程序通过Spring Cloud Stream注入的输入和输出通道与外部世界通信。通道通过特定于中间件的绑定器实现连接到外部代理。

图片来源于官方文档,适当的中间层,帮助我们屏蔽了底层细节(我怎么有一种 JAVA 多态的感觉)。


3.2 绑定器抽象

Spring Cloud Stream 提供了 kafkaRabbitMQ 的绑定器的实现。我们可以基于可拓展API 编写自己的绑定器。

Spring Cloud Stream使用Spring Boot进行配置,绑定器抽象使得Spring Cloud Stream应用程序在连接到中间件的方式上更加灵活。例如,部署人员可以在运行时动态地选择通道连接到的目的地(例如Kafka主题RabbitMQ交换)。可以通过外部配置属性和Spring Boot支持的任何形式(包括应用程序参数、环境变量和application.yml 或者 application.properties 文件)提供此类配置。在介绍Spring Cloud Stream部分的sink示例中,将Spring.Cloud.Stream.bindings.input.destination设置为raw-sensor-data,会导致它从raw-sensor-data Kafka主题读取数据,或者从绑定到raw-sensor-data RabbitMQ交换的队列读取数据。

由于消息中间件的概念上的不同,kafka是基于主题的,而 rabbitmq是根据交换来决定消息的目的地的。抽象出绑定器的概念,会为我们解决这些烦恼的问题。

3.3 持久化的发布-订阅支持

应用程序之间的通信遵循发布-订阅模型,其中通过共享主题广播数据。发布-订阅通信模型降低了生产者和消费者的复杂性,并允许将新应用程序添加到拓扑中,而不破坏现有的流。

3.4 消费组支持

虽然发布-订阅模型使通过共享主题连接应用程序变得很容易,但是通过创建给定应用程序的多个实例来扩展的能力同样重要。这样做时,应用程序的不同实例被放置在竞争的使用者关系中,其中只有一个实例需要处理给定的消息。

Spring Cloud Stream通过consumer group的概念对这种行为进行建模。(Spring Cloud Stream consumer groupkafka consumer groups相似,并受到kafka consumer group 的启发。

每一个消费者绑定都可以使用 spring.cloud.stream.bingings.<channelName>.group 属性指定组名称。

订阅给定目的地的所有组都接收已发布数据的副本,但每个组只有一个成员从该目的地接收给定消息。默认情况下,当没有指定一个组时,Spring Cloud Stream将应用程序分配给一个匿名且独立的单成员使用者组,该用户组与所有其他使用者组处于发布-订阅关系。也就是说默认情况下消费组是存在的。

消费者类型,支持两类消费者:

  • 消息驱动(有时称为异步)

  • 轮询(有时称为同步)

在版本2.0之前,只支持异步使用者。消息一旦可用,并且有一个线程可用来处理它,就会被传递。

当您希望控制消息处理的速率时,您可能希望使用同步使用者。

3.5 分区支持

Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区的场景中,物理通信媒介(例如代理主题)被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由公共特征标识的数据由相同的消费者实例处理。

Spring Cloud Stream为以统一的方式实现分区处理用例提供了一个公共抽象。因此,无论代理本身是否自然分区(例如Kafka),都可以使用分区(例如RabbitMQ)。

在有状态处理中,分区是一个关键的概念,确保所有相关数据一起处理是非常关键的(无论是出于性能还是一致性的原因)。例如,在有时间窗的平均计算示例中,重要的是来自任何给定传感器的所有测量都由相同的应用程序实例处理。

要设置分区处理场景,必须同时配置数据生成端和数据使用端。

4. 编程模型

核心概念:

  • Destination Binders:负责与外部消息传递系统集成的组件。

  • Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的生产者和消费者(由Destination Binders创建)。

  • Message:生产者和消费者用于与目标绑定器(以及通过外部消息传递系统与其他应用程序)通信的规范数据结构。

4.1 Destination Binders

目标绑定器是Spring Cloud Stream的扩展组件,负责提供必要的配置和实现,以促进与外部消息传递系统的集成。这种集成负责与生产者和消费者之间的连接、委托和消息路由、数据类型转换、用户代码调用等等。

关于 入门使用的 RabbitMQ 的目标绑定器,我们可以在 spring.cloud.stream.binder.rabbit 包中找到。

4.2 Destination Bindings

如前所述,目标绑定提供了外部消息传递系统与应用程序提供的生产者和消费者之间的桥梁


@EnableBinding注释应用于应用程序的一个配置类定义了目标绑定,@EnableBinding注释可以将一个或多个接口类作为参数。这些参数被称为绑定,它们包含表示可绑定组件的方法。列如在 “入门” 中,我们指定了 Sink.class 参数,这是Spring Cloud Stream 已经为典型的消息交换契约提供的绑定接口。还包括:

  • Sink:通过提供使用消息的目的地来标识消息使用者的契约。

    public interface Sink {String INPUT = "input";@Input(Sink.INPUT)SubscribableChannel input();
    }
    
  • Source:通过提供生成的消息发送到的目的地来标识消息生成器的契约。

    public interface Source {String OUTPUT = "output";@Output(Source.OUTPUT)MessageChannel output();
    }
    
  • Processor:通过暴露两个允许使用和生成消息的目的地来封装接收和源契约。

    public interface Processor extends Source, Sink {}
    
4.2.1 自定义 Binding 实现

我们将尝试自定义 Binding

新增绑定器:

EchoTalk.java

public interface EchoTalk {@Input()SubscribableChannel echoTalkRec();@Output()MessageChannel echoTalkSend();}

SinkReceiver.java 新增如下代码:

 @Resourceprivate EchoTalk echoTalk;    public String send(String payload) {logger.info("Received wait Send Msg: " + payload);echoTalk.echoTalkSend().send(new GenericMessage(payload));return payload ;}@StreamListener("echoTalkRec")public void receiveFromTopicTalk(Object msg){logger.info("echoTalkRec: " + msg);}

新增 StreamController.java 类:

@RestController
public class StreamController {@Autowiredprivate SinkReceiver sinkReceiver;@GetMapping("/send")public void send(@RequestParam String msg){sinkReceiver.send(msg);}
}

通过调用 /send 接口调用 echoTalkSend发送消息,然后通过 echoTalkRec 处理消息。

配置文件修改如下:

server:port: 60030
spring:cloud:stream:bindings:echoTalkSend:destination: echoechoTalkRec:destination: echo

启动两个实例,调用其中一个接口的 /send 接口,观察两个实例的控制台发现都接收到了消息。

如果只需要同一服务的某一个实例处理消息,怎么办?添加分组即可解决:

server:port: 60030
spring:cloud:stream:bindings:echoTalkSend:destination: echoechoTalkRec:destination: echogroup: helloD

可以发现,两个服务将会依次接收消息。

4.3 生产和消费 Message

可以选择使用 Spring Integration 注释或者 Spring Cloud Stream 原生注释来编写应用程序。

4.3.1 Spring Integration Support

Spring Cloud Stream 建立在 Spring Integration 之上,所以它支持 Spring Integration 已经建立的基础、语义和配置选项。

有关 Spring Integration的更详细用法,查看 Spring Intergation 文档。

虽然这可能略过了一点,但重要的是要理解,当您使用@StreamListener注释从相同的绑定消费时,使用的是发布-订阅模型。使用@StreamListener注释的每个方法都接收自己的消息副本,每个方法都有自己的使用者组。但是,如果您通过使用Spring Integration注释之一(如@Aggregator@Transformer@ServiceActivator)使用相同的绑定,则它们将在竞争模型中使用。没有为每个订阅创建单独的使用者组。

4.3.2 使用 @StreamListener 注解

作为对Spring集成支持的补充,Spring Cloud Stream提供了自己的@StreamListener注释,模仿了其他Spring消息传递注释(@MessageMapping@JmsListener@RabbitListener等),并提供了诸如基于内容的路由等便利。

与其他Spring消息传递方法一样,可以使用@Payload@Headers@Header来注释方法参数。

具体可以查看Spring 的 Spring-messaging 模块。

对于返回数据的方法,必须使用@SendTo注释为方法返回的数据指定输出绑定目的地,就像我们上面使用的一样。

4.3.3 使用 @StreamListener 进行基于内容的路由

Spring Cloud Stream支持根据条件将消息发送到由@StreamListener注释的多个处理程序方法。

为了支持条件调度,方法必须满足以下条件:

  • 它不能返回值。

  • 它必须是单独的消息处理方法(不支持反应性API方法)。

条件由@StreamListener注释的条件参数中的SpEL表达式指定,并对每个消息求值。所有与条件匹配的处理程序都在同一个线程中调用,并且不需要对调用的顺序进行假设。


在下面的具有调度条件的@StreamListener示例中,所有带有标头type值为bogey的消息都被分派到receiveBogey方法,而所有带有标头type值为bacall的消息都被分派到receiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")public void receiveBogey(@Payload BogeyPojo bogeyPojo) {// handle the message}@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")public void receiveBacall(@Payload BacallPojo bacallPojo) {// handle the message}
}

还可以基于内容协商:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {@StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")public void bark(Dog dog) {// handle the message}@StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")public void purr(Cat cat) {// handle the message}
}

前面的代码完全有效。它编译和部署时没有任何问题,但它永远不会产生您期望的结果。
这是因为您正在测试的东西在您期望的状态中还不存在。这是因为消息的有效负载尚未从连接格式(byte[]) 转换为所需类型。换句话说,它还没有经历内容类型协商中描述的类型转换过程。
因此,除非使用计算原始数据的SPeL表达式(例如,字节数组中第一个字节的值),否则使用基于消息头的表达式(例如condition = “headers[‘type’]==‘dog’”)。更详细的内容参考 内容协商文档。

4.3.4 使用轮询消费者

还记得在消费组中讲到的消费者类型嘛?这里讲的便是其中的轮询(同步)。

在使用轮询消费者时,您将根据需要对PollableMessageSource进行轮询:

public interface PolledConsumer {@InputPollableMessageSource destIn();@OutputMessageChannel destOut();}

基于以上的消费者轮询可能想这样使用:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {return args -> {while (someCondition()) {try {if (!destIn.poll(m -> {String newPayload = ((String) m.getPayload()).toUpperCase();destOut.send(new GenericMessage<>(newPayload));})) {Thread.sleep(1000);}}catch (Exception e) {// handle failure}}};
}

4.4 错误处理

错误发生时,Spring Cloud Stream 提供了几种灵活的机制来处理。

  • 应用程序:错误处理在应用程序中完成(自定义错误处理程序)。
  • 系统:错误处理委托给绑定器(重入队列等)。依赖于绑定器实现和底层消息传递中间件的功能。

Spring Cloud Stream使用Spring Retry库来促进消息处理成功。有关详细信息,请参阅Spring Retry。但是,当所有操作失败时,消息处理程序抛出的异常将传播回绑定器。此时,绑定器调用自定义错误处理程序,或将错误传回消息传递系统(重新排队等)。

错误处理消息通道为 <destination>.<group>.errors;

SpringCloudStream之消息驱动的微服务相关推荐

  1. Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】

    通过上一篇<消息驱动的微服务(消费组)>的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备 ...

  2. Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】

    通过之前的<消息驱动的微服务(入门)>一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识.但是,对于<消息驱动的微服务(核心概念)>一文中提到的一 ...

  3. Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

    之前在写Spring Boot基础教程的时候写过一篇<Spring Boot中使用RabbitMQ>.在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息.实际上我 ...

  4. Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】

    通过<Spring Cloud构建微服务架构:消息驱动的微服务(入门)>一文,相信大家对Spring Cloud Stream的工作模式已经有了一些基础概念,比如:输入.输出通道的绑定,通 ...

  5. Spring Cloud架构教程 (六)消息驱动的微服务【Dalston版】

    Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架.它可以基于Spring Boot来创建独立的.可用于生产的Spring应用程序.它通过使用Spring Integr ...

  6. 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

    系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...

  7. java B2B2C Springcloud多租户电子商城系统-消息驱动的微服务(消费分区)

    通过之前的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费.这 ...

  8. 元数据驱动的微服务架构(上)

    本次分享有两个部分: 微服务架构需要元数据 介绍微服务与元数据的关系. 一.微服务架构需要元数据 企业IT架构已经发展了多个阶段,一方面是服务化架构的发展,在SOA阶段主要解决应用间集成问题,但随着企 ...

  9. RocketMQ 千锤百炼--哈啰在分布式消息治理和微服务治理中的实践

    作者|梁勇 ​ 背景 ​ 哈啰已进化为包括两轮出行(哈啰单车.哈啰助力车.哈啰电动车.小哈换电).四轮出行(哈啰顺风车.全网叫车.哈啰打车)等的综合化移动出行平台,并向酒店.到店团购等众多本地生活化生 ...

最新文章

  1. 英国皇家学会院士樊文飞:把大数据变小,突破企业资源限制
  2. c语言更新数据,sqlite学习笔记10:C语言中使用sqlite之查询和更新数据
  3. c++-内存管理-G4.9
  4. [BZOJ3000] Big Number (Stirling公式)
  5. 博客园的祥和需要大家共同努力
  6. java enum 2d array,Java-打印2D数组的最佳方法?
  7. LinuxDay19——加密与安全(2)
  8. 千万级用户的大型网站,应该如何设计其高并发架构?(彩蛋)
  9. Zoox 的自动驾驶汽车方法
  10. 说说用笔记本电脑的惨痛经历
  11. [附源码]计算机毕业设计springboot动物保护协会网站
  12. IOS 使用TestFlight 详解
  13. 局域网内通过ip获取主机名
  14. 年轻的艺术家们是如何通过NFT赚取百万美元的?
  15. iPad、iPad Pro反复自动重启怎么办?
  16. 干货 |如何优雅的在手机上进行Python编程
  17. 飞机大战(案例详解)
  18. CVE-2017-8464远程命令执行漏洞(震网漏洞)复现
  19. 实习证明| 大数据在线实习项目意义
  20. 匹配表情emoji 正则_js判断替换emoji表情?

热门文章

  1. C++实现行最简型矩阵
  2. Docker相关的网络问题-软件源和镜像加速器
  3. mybatis 配置之 typeAliases 别名配置元素设置
  4. 微型计算机计算机钢琴,用汇编语言编写计算机钢琴程序
  5. 工作中效率有待提高的点
  6. 《深度学习案例精粹:基于TensorFlow与Keras》深度学习常用训练案例合集
  7. 人身上最大的洞是什么?
  8. 游戏 android 大作,玩游戏别坐过站!最易沉迷的5款安卓大作
  9. [Java Path Finder][JPF学习笔记][2]在Windows Server上安装JPF
  10. 关联规则——关联分析