常见问题(使用rabbitmq)

消息分组防止多实例重复消费
在一个服务多实例场景下使用默认使用@StreamListener监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq上注册的队列,需要给消费者指定一个消费组,让消息在组里只被消费一次;

spring.cloud.stream.bindings.xxx(消费者队列名).group=xxx(组名)

在springboot下在同一个服务(项目中)使用@input@outPut时指定的队列名是不可以重复的.会在启动编译的时候报bean定义重复。需要在yml给生产者和消费者指定同一个交换机。

spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test

先上依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.fchan</groupId><artifactId>springcloudstream</artifactId><version>0.0.1-SNAPSHOT</version><name>springcloudstream</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!--            <version>2.0.1.RELEASE</version>--></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>Ditmars.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

再上yml配置

spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test

消息生产者

package com.fchan.springcloudstream.service;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyMessageChannel {String out = "out";String in = "in";@Output(out)MessageChannel out();@Input(in)SubscribableChannel in();}

发送消息

package com.fchan.springcloudstream.controller;import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@RestController
public class MessageController {@Resourceprivate MyMessageChannel myMessageChannel;@RequestMapping("test")public String testMessage(){Map<String,Object> map = new HashMap<>();map.put("shopId", "123");myMessageChannel.out().send(MessageBuilder.withPayload(map).build());return "success";}}

消息消费者

package com.fchan.springcloudstream.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {Logger log = LoggerFactory.getLogger(MyConsumer.class);@StreamListener(MyMessageChannel.in)public void input(Message<Map<String,Object>> message){log.info("收到消息:{}", message.getPayload());}}

springcloud的stream消息组件的使用@StreamListener相关推荐

  1. SpringCloud (十一) --------- Stream 消息驱动框架

    目录 一.Stream 概述 二.Stream 重要概念 三.Stream 应用 四.Stream 自定义消息通道 五.Stream 分组与持久化 六.Stream 设置路由键 一.Stream 概述 ...

  2. SpringCloud微服务架构,Config 分布式配置中心,Bus 消息总线, Stream 消息驱动,Sleuth+Zipkin 链路追踪

    Config分布式配置中心 Config 概述 概述 • Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护. • 好处: • 集中管理配置文件 • 不同环境不同配 ...

  3. SpringCloud Stream消息驱动

    为啥有这个技术??? 1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低. 2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracl ...

  4. SpringCloud2020学习笔记13——SpringCloud Stream消息驱动

    目录 一.消息驱动概述 1.简介 2.官网 2.设计思想 ① 标准MQ ② 为什么用Cloud Stream ③ Stream中的消息通信方式遵循了发布-订阅模式 3.Spring Cloud Str ...

  5. 手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动

    Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...

  6. SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream

    1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...

  7. SpringCloud(十一)Bus消息总线、Stream消息驱动

    一.Bus消息总线 需求:分布式自动刷新配置功能: 解决:SpringCloud Bus配合Spring cloud Config使用可以实现配置的动态刷新. 1.概述 定义:Spring Cloud ...

  8. SpringCloud第九章(消息驱动Strem和RabbitMQ)

    目录 1:什么是Strem 2:Strem架构 2.1:传统的消息对列架构 2.2:strem的消息对列架构 3:案例介绍 3.1:8001生产者 3.2:8002.8003消费者配置一致 4:总结消 ...

  9. Spring Cloud Stream消息驱动

    一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...

最新文章

  1. 覆盖分类的方法_老罗讲分类|垃圾分类回收模式到底用哪种好?
  2. pythonweb开发-手把手教你写网站:Python WEB开发技术实战
  3. 快速排序算法的实现 随机生成区间里的数 O(n)找第k小 O(nlogk)找前k大...
  4. LeetCode 166. 分数到小数(小数除法)
  5. 除夕快乐 | 2月11日 星期四 | B站发文回应热搜风波;美团上线“团好货”独立App;国内首家自动驾驶企业获网约车运营许可...
  6. Bootstrap研究2-布局系统杂记
  7. opensips mysql 版本_Opensips-1.11版本安装过程
  8. Cookie/Session机制具体解释
  9. vue项目实现文字转换成语音播放功能
  10. python支持复数类型以下说法错误的是_关于Python的数字类型,以下选项中描述错误的是...
  11. 2021年山东省安全员C证报名考试及山东省安全员C证操作证考试
  12. 使用RTL-8139网卡制作硬盘还原卡
  13. 在Android4.0以上设备的虚拟按键中显示menu键
  14. Vue3-歌词根据时间自动滚动
  15. DeepMind黄士杰:深度学习有创造性,正参与星际2项目
  16. 河南的抗疫英雄(C语言嘞)
  17. 图片怎么转化为pdf格式文件?图片如何转变为pdf格式?
  18. 自旋对于synchronized关键字的底层意义与价值分析:
  19. filezilla 配置
  20. 北美票房排行榜 实时_快手直播丨主播实时直播监测数据分享——思文22号美妆童装专场...

热门文章

  1. 标准应用获取天翼视联的监控设备
  2. Spring停止或开启定时器
  3. Entity Framework — ( Database First )
  4. LINQ查询表达式基础
  5. 企业微信SDK接口API调用-触发推送企业微信联系人列表
  6. 展望2023,软件测试的走势分析详情
  7. 51单片机的中断系统详解
  8. html怎么让页面没有滑动条,使用CSS实现无滚动条滚动的两种方法
  9. 1609:变幻的矩阵
  10. LeetCode笔记:Biweekly Contest 85