在Spring Cloud 分布式消息—Spring Cloud Stream 简介与入门一篇我们简单了介绍了Spring Cloud Stream,并且使用Spring Cloud Stream提供的默认通道input,output简单的做了一个示例,本篇我们将会使用自定义通道做一个示例,并且介绍Spring Cloud Stream的高级应用,如果对Spring  Cloud Stream 不了解可以先去阅读Spring Cloud 分布式消息—Spring Cloud Stream 简介与入门一篇文章。

在上一篇我们介绍了两个注解@Input和@Output分别用于定义输入和输出通道,Spring Cloud Stream定义的SInk和Source就是使用了这两个注解。我们自定义通道也需要使用这两个注解,使用这两个注解,我们很容易定义输入和输出通道,比如我们定义一个日志的输入输出通道,代码如下:

//定义日志输入通道
public interface LogSink {String INPUT = "logInput";@Input("logInput")SubscribableChannel input();
}
//定义日志输出通道
public interface LogSource {String OUTPUT = "logOutput";@Output("logOutput")MessageChannel output();
}

在编写在通道之后,我们需要在配置文件中定义通道,将通道绑定到binder,并且为binder绑定到消息中间件,上面的通道配置如下:

spring:cloud:stream:bindings: #用于绑定通道到binderlogInput: #ChannelName 这里是输入通道的名称,如果@Input不指定默认为方法名destination: log #从哪里接收消息,这里指topic或者队列名称,在rabbit中为exchangebinder: logBinder #绑定到名为logBinder的binderlogOutput: #ChannelName 这里是输出通道的名称,如果@Output不指定默认为方法名destination: log #将消息发送到哪里,这里指topic或者队列名称,在rabbit中为exchangebinder: logBinder #绑定到名为logBinder的bindercontent-type:binders: #配置binderlogBinder: #配置名称为hello1的bindertype: rabbit #binder类型为rabbitMqenvironment: #配置运行环境spring:rabbitmq:host: 10.0.10.63  #地址port: 5672        #端口username: guest   #用户名password: guest   #密码
server:port: 8092

然后需要@EnableBinding注解绑定通道即可,消息接收的代码如下所示,发送消息的代码这里不再展示,只需要@EnableBinding(LogSource.class),然后使用@Autowire注解将LogSource注入即可。

@EnableBinding(LogSink.class)
public class LogReceiver {private static Logger logger = LoggerFactory.getLogger(LogReceiver.class);@StreamListener(LogSink.INPUT)public void receive(String payload) {logger.info("Received: " + payload);}
}

上面的代码中,我们将输入通道、输出通道分别定义在不同的接口中,如果通道特别多,则需要定义多个接口,因此我们可以将多个通道定义在一个接口内,@EnableBinding也只需要绑定一个接口即可。如下代码,我们只需要@EnableBinding(LogChannel.class)即可,在发送消息时使用@Autowire注解LogChannel接口。

public interface LogChannel {String INPUT = "service1logInput";@Input("service1logInput")SubscribableChannel service1input();String INPUT2 = "service2logInput";@Input("service1logInput")SubscribableChannel service2input();String OUTPUT = "service1logOutput";@Output("service1logOutput")MessageChannel service1logOutput();String OUTPUT2 = "service2logOutput";@Output("service12ogOutput")MessageChannel service2logOutput();
}

Spring Cloud Stream建立在Enterprise Integration Patterns定义的概念和模式的基础上,并依赖于其内部实现,该内部实现依赖于Spring项目组合中已建立且流行的Enterprise Integration Patterns实现: Spring Integration Framework。因此,它自然支持Spring Integration已经建立的基础,语义和配置选项。比如你可以使用@InboundChannelAdapter将Source的输出通道附加到MessageSource上,同样,可以在提供Processor绑定合约的消息处理程序方法的实现时使用@Transformer或@ServiceActivator ,代码示例如下所示:

@EnableBinding(Source.class)
public class TimerSource {@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))public MessageSource<String> timerMessageSource() {return () -> new GenericMessage<>("Hello Spring Cloud Stream");}
}@EnableBinding(Processor.class)
public class TransformProcessor {@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)public Object transform(String message) {return message.toUpperCase();}
}

关于@StreamListener注解,它是Spring Cloud Stream 对Spring Integration的补充,像Spring Messaging的其他注解(@JmsListener)一样能为我们提供诸如路由等功能。对于使用@StreamListener注解的方法,你可以返回一个数据,但是你必须使用@SendTo注解指定该方法返回的数据的输出的目的地,代码如下所示:

@EnableBinding(Processor.class)
public class TransformProcessor {@AutowiredVotingService votingService;@StreamListener(Processor.INPUT)@SendTo(Processor.OUTPUT)public VoteResult handle(Vote vote) {return votingService.record(vote);}
}

Spring Cloud Stream支持基于@SpringListener注解上的条件,将消息转发给多个处理方法,这些方法不能有返回值并且是单独的消息处理方法。我们可以为@StreamListener的condition参数传入一个指定的SpEL表达式。这样只有匹配到表达式的处理器才会被调用。代码示例如下:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")public void receiveBogey(@Payload BogeyPojo bogeyPojo) {// handle the message}@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")public void receiveBacall(@Payload BacallPojo bacallPojo) {// handle the message}
}

发布-订阅模型使通过共享topic让应用程序之间交互更加容易,但是通过创建给定应用程序的多个实例进行扩展的能力同样重要。这样做时,会将应用程序的不同实例置于竞争的消费者关系中,在该消费者关系中,只有一个实例可以处理给定消息。Spring Cloud Stream提供了消费组的概念可以通过spring.cloud.stream.bindings.<channelName>.group配置一个组别。如下图中的消费者 将会使用该配置:spring.cloud.stream.bindings.<channelName>.group=hdfsWrite或者 spring.cloud.stream.bindings.<channelName>.group=average.

订阅给定目标的所有组都将收到已发布数据的副本,但是每个组中只有一个成员从该目标接收给定消息。默认情况下,当未指定组时,Spring Cloud Stream会将应用程序分配给一个匿名且独立的单成员使用者组,该使用者组与所有其他使用者组具有发布-订阅关系。

除了分组,

Spring Cloud Stream提供了对给定应用程序的多个实例之间的数据分区的支持。在分区方案中,物理通信介质(例如代理主题)被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由共同特征标识的数据由同一消费者实例处理。Spring Cloud Stream提供了一种通用抽象,用于以统一的方式实现分区处理用例。因此,无论代理本身是自然分区(例如,Kafka)还是非自然分区(例如,RabbitMQ),都可以使用分区。分区结构与所需配置如下所示:

#下面是生产者配置
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.<channel-name>.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.<channel-name>.producer.partitionCount=2#下面是消费者配置
#开启消费者分区功能
spring.cloud.stream.bindings.<channel-name>.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=1

本篇介绍了Spring Cloud Stream 如何自定义通道名称和Spring Cloud Stream 分组和分区和路由等高级应用。下一篇我们会介绍Spring Cloud Stream的异常处理和一些高级配置的知识。

Spring Cloud 分布式消息—Spring Cloud Stream 自定义通道与分组分区应用相关推荐

  1. Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)

    上一篇文章,留了一个悬念,Config Client 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多 ...

  2. Spring Cloud Alibaba 消息队列:基于 RocketMQ 实现服务异步通信

    本讲咱们将学习以下三方面内容: 介绍消息队列与 Alibaba RocketMQ: 掌握 RocketMQ 的部署方式: 讲解微服务接入 RocketMQ 的开发技巧: 首先咱们先来认识什么是消息队列 ...

  3. springcloud分布式事务_Spring Cloud学习资源一网打尽!Awesome Spring Cloud v1.0

    公正.公平.尊重原创.不夹带私人恩怨的Spring Cloud学习资源列表. TIPS: •本文链接较多,为了更好的阅读体验,建议翻到文章末尾,点击"扩展链接",排版相对好很多. ...

  4. Spring Cloud Bus 消息总线介绍

    作者 | 洛夜 来源 | 阿里巴巴云原生公众号 在 Spring 生态中玩转 RocketMQ 系列文章: <如何在 Spring 生态中玩转 RocketMQ?> <罗美琪和春波特 ...

  5. 干货|Spring Cloud Bus 消息总线介绍

    2019独角兽企业重金招聘Python工程师标准>>> 继上一篇 干货|Spring Cloud Stream 体系及原理介绍 之后,本期我们来了解下 Spring Cloud 体系 ...

  6. spring cloud微服务分布式云架构-Spring Cloud 分布式的五大重点

    SpringCloud分布式的五大重点的基本介绍 服务器的注册与发现-Netflix Eureka 客户端负载均衡-Netflix Ribbon 断路器-Netflix Hystrix 服务网关-Ne ...

  7. 【夯实Spring Cloud】Spring Cloud分布式配置中心详解

    本文属于[夯实Spring Cloud]系列文章,该系列旨在用通俗易懂的语言,带大家了解和学习Spring Cloud技术,希望能给读者带来一些干货.系列目录如下: [夯实Spring Cloud]D ...

  8. 【Spring Cloud Alibaba】Spring Cloud Alibaba 分布式配置Nacos实践

    课程说明 本次课程,你将会学习如下: 使用 Nacos Config 作为 Spring Cloud 分布式配置 使用 Nacos Config 实现 Bean 动态刷新 了解 Nacos Confi ...

  9. Spring Cloud Bus 消息总线实现配置自动刷新

    why 当微服务太多的时候,服务之间需要建立通信或一个服务的改变需要广播到所有其它服务,这时就需要有一个总线来承担对应的职责. what spring cloud bus 是通过轻量消息代理连接各个分 ...

最新文章

  1. 公钥和私钥的简单通俗说明
  2. markdown常用操作(特殊字符显示、换行、字体颜色和大小、图片位置和大小)
  3. 软件系统架构~软件架构概念
  4. springMVC 格式转换
  5. php扩展可以通过pecl 或者phpize 安装
  6. Permission denied (publickey) 解决方案
  7. 2010年IT行业十大收购
  8. 使用“网吧卫士”实现网吧带宽完美管理(转)
  9. 华为手机如何更新鸿蒙系统_华为鸿蒙系统怎么升级?升级鸿蒙系统步骤
  10. 单反相机参数之光圈、快门篇
  11. hadoop集群安装配置Kerberos(二):搭建kerberos基础环境(主从kdc)
  12. 安装完ubuntu系统后的优化
  13. java怎么没有jmf包_java JMF
  14. R 和 Rstudio 在线更新
  15. pb 如何导出csv_打开CSV格式文件?英雄请留步
  16. 大数据基础——HDFS(分布式文件系统)
  17. java实验四 综合应用实验
  18. TensorFlow2 -官方教程 :保存和恢复模型
  19. VC6.0 通过ado连接access数据库
  20. 转:Python数据分箱,计算woe,iv

热门文章

  1. 【笔耕不辍勋章活动】生命不止,写作不息
  2. unity + leapMotion 判断手指捏合(拇指和食指)
  3. 【Youtobe trydjango】Django2.2教程和React实战系列八【渲染数据库数据与模板加载顺序探究】
  4. Ubuntu安装配置Java
  5. 超级适合小白!学Java必读书籍,强烈推荐
  6. 力扣(leetcode)[453. 最小操作次数使数组元素相等]我见识到了,算法的终极是数学
  7. idm服务器响应显示您没有权限下载解决教程
  8. NB-IOT UDP透传测试
  9. 设计分享|单片机左右来回的流水灯
  10. 掘金社区自动签到+免费抽奖