RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。
网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。
自主搭建的RocketMQ
通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。
这里我们只摘取其中消费者的部分代码:
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED, selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received registered message: {}", message);}
}
这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。
针对同一的topic,另外一个tag的消费者的实现如下:
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY, selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)
public class MqModifyListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received modify message: {}", message);}
}
我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。
关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。
基于云服务的RocketMQ
基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。
具体处理这里也只贴出部分代码:
@Configuration
public class ConsumerClient {@Resourceprivate MqConfigProperties mqConfigProperties;@Resourceprivate EquipmentMessageListener equipmentMessageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();// 配置文件Properties properties = mqConfigProperties.getMqProperties();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId());// 将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);// 订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();// --------业务板块开始--------Subscription subscription = new Subscription();// 设置需要消费的消息所属的topicsubscription.setTopic(MqConfigProperties.getInnerTopic());// 设置需要消费的消息所属的tagsubscription.setExpression(MqConfigProperties.getEquipmentMonitorTag());// 实现MessageListener接口,并且在consume方法中实现消费逻辑subscriptionTable.put(subscription, equipmentMessageListener);//订阅多个topic如上面设置// --------业务板块结束--------// 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}
在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。
往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。
那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。
如此一来,就可以监听一个topic下的不同tag了。
原理分析
两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。
这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。
这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?
那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。
如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。
如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。
原文链接:《RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?》
程序新视界
公众号“程序新视界”,一个让你软实力、硬技术同步提升的平台,提供海量资料
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?相关推荐
- 阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
面试官:同一个消费组内的消费者,如果订阅了相同的 topic,但是订阅的 tag 不一样,会有什么问题吗? 我:会出现丢消息的情况. 面试官:能详细说一说吗? 我:RocketMQ 要求同一个消费组内 ...
- rocketmq延时消息自定义配置;topic下tag使用
概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...
- 消息中间件学习总结(11)——Kafka与RocketMQ的Topic数量对单机性能的影响比较分析
引言 上一期我们对比了三类消息产品(Kafka.RabbitMQ.RocketMQ)单纯发送小消息的性能,受到了程序猿们的广泛关注,其中大家对这种单纯的发送场景感到并不过瘾,因为没有任何一个网站的业务 ...
- RocketMQ 中Topic、Tag、GroupName基本概念介绍
本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...
- [Azure][Event hub]Kafka无法同时连接到同一个namespace下的两个Event hub
1.问题背景 有一个应用需要用kafka消费event hub的消息,其中两个kafka consumer同时连接到了同一个namespace下的两个event hub. kafka:consumer ...
- IntelliJ IDEA导入多个eclipse项目到同一个workspace下
IntelliJ IDEA 与eclipse在新建项目上工作区的叫法略有不同,区别见下图. 我们在eclipse都是在新建的workspace目录下新建我们的项目,但是在IDEA中没有workspac ...
- php只显示一部分文章,typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章...
typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章 作者:佚名 来源:爱好者 时间:2018-04-30 问题描述: 同页面调用分类下文章,只显示一第一个分类下的文章 在一个页面中, ...
- 启动mq命令 linux,RocketMQ:Linux下启动server和broker的命令
目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...
- RocketMQ:Linux下启动server和broker的命令
目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...
最新文章
- Hive导出复杂数据到csv文件
- asp.net core 系列 18 web服务器实现
- 【Python】Matplotlib太臃肿,试试Seaborn
- Metasploit Framework(1)基本命令、简单使用
- 项目部署到tomcat6.0启动成功后访问页面报500_.net core IIS部署教程
- vue项目中eslint检查警告——“Trailing spaces not allowed”
- OpenShift 4 之脚本化部署Istio的HelloWorld和BookInfo示例
- 基于容器实现高并发网站
- 软件设计模式之单例模式
- python网络爬虫(一):网络爬虫科普与URL含义
- 报表 labview_干货!24个LabVIEW初学者常见问题及解答汇总
- Linux下DNS服务器的基本搭建
- ESP8266热点配网-Arduino代码分享
- SOMEIP报文格式部分字段概述(二)
- (十一)GDBdebug调试技术——malloc()和free()发生故障
- 零碎记录- spring security oauth2 资源服务器中设置放行路径
- 当红小生酒店施暴性感女星
- Jumserver安装日志审计和资产管理
- 结合运动流的时间先验在微创手术视频中的器械分割
- python日常实用小脚本-用Python编写渗透用小脚本 短小实用
热门文章
- 中企故事汇:马可波罗三生三世的故事
- VxWorks错误码查找表
- untu复制粘贴快捷键
- netty报Unknown channel option ‘TCP_NODELAY‘ for channel异常
- 互动媒体尝试之P5创意绘图板
- 苹果cms微信对接php,苹果cmsV10微信公众号对接插件安装教程
- 码商的末日:提供个人收款码的“兼职”也被警方抓了!涉嫌共同犯罪
- 2017 上半年我整理的好文章(上)(98 篇)
- 微格式(microformat)
- Windows系统中设置软件的开机自动启动