针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。

网上有很多分析相关使用方式的文章,虽然分析的结果都是“不可以”,但我们可以通过其他的一些方案来进行解决。

自主搭建的RocketMQ

通过自主搭建RocketMQ,然后通过SpringBoot进行集成实现,可以参考在公众号【程序新视界】中的文章《Spring Boot快速集成RocketMQ实战教程》,可关注公众号搜索,也可以关注公众号之后回复“1003”,完整的实战步骤。

这里我们只摘取其中消费者的部分代码:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED, selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received registered message: {}", message);}
}

这是其中一个消费者,消费的topic为MqTopicConstant.DEMO_TOPIC,consumerGroup为REGISTERED的,tag便是selectorExpression指定的REGISTERED的tag。

针对同一的topic,另外一个tag的消费者的实现如下:

@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY, selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY)
public class MqModifyListenerDemo implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class);@Overridepublic void onMessage(String message) {log.info("received modify message: {}", message);}
}

我们可以看到topic是同一个,但consumerGroup和tag不同。这说明什么?这说明只要消费者的consumerGroup不同,那么topic相同的情况下,也可以通过tag进行区分的。

关于其他源码就不再这里贴出了,详情可关注公众号看对应文章。

基于云服务的RocketMQ

基于云服务的RocketMQ与自主搭建的基本一致,我们只要确保groupId(阿里云的叫法)不同,那么同一topic下的tag是可以进行区分处理的。

具体处理这里也只贴出部分代码:

@Configuration
public class ConsumerClient {@Resourceprivate MqConfigProperties mqConfigProperties;@Resourceprivate EquipmentMessageListener equipmentMessageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();// 配置文件Properties properties = mqConfigProperties.getMqProperties();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId());// 将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");consumerBean.setProperties(properties);// 订阅关系Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();// --------业务板块开始--------Subscription subscription = new Subscription();// 设置需要消费的消息所属的topicsubscription.setTopic(MqConfigProperties.getInnerTopic());// 设置需要消费的消息所属的tagsubscription.setExpression(MqConfigProperties.getEquipmentMonitorTag());// 实现MessageListener接口,并且在consume方法中实现消费逻辑subscriptionTable.put(subscription, equipmentMessageListener);//订阅多个topic如上面设置// --------业务板块结束--------// 将订阅者消息放入consumerBean中,在Spring初始加载该bean时,监听MQ中的Topic和tag下的消息consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

在上面的代码中,重点是业务板块部分的代码,如果在订阅关系中重新将业务板块内的代码copy一份,然后修改对应的Expression值(也就是tag值),那么基本上是不会成功的。

往往发送大量消息,只能够收到一部分。其他的会被覆盖掉。当然,如果你想采用不同的topic来处理,只需将业务板块中的内容重新修改,添加到subscriptionTable中即可。

那么,如何解决标题中的问题呢?思路与第一种方案一样,阿里云这里只是创建了一个ConsumerBean,而上面的自主搭建时采用了多个Consumer。那么解决方案就是:初始化多个ConsumerBean,每个ConsumerBean中的配置不同的groupId和tag,同时注册不同的监听器。

如此一来,就可以监听一个topic下的不同tag了。

原理分析

两个一样的ConsumerGroup的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。

这种现象的原因是:消息的分配是Broker决定的,而不是Consumer端,Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。

这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?

那是因为:集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。

如果换成BROADCASTING,那后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。

如果还有其他相关问题,也可关注公众号“程序新视界”,相互沟通学习。

原文链接:《RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?》


程序新视界
公众号“程序新视界”,一个让你软实力、硬技术同步提升的平台,提供海量资料

RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?相关推荐

  1. 阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?

    面试官:同一个消费组内的消费者,如果订阅了相同的 topic,但是订阅的 tag 不一样,会有什么问题吗? 我:会出现丢消息的情况. 面试官:能详细说一说吗? 我:RocketMQ 要求同一个消费组内 ...

  2. rocketmq延时消息自定义配置;topic下tag使用

    概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...

  3. 消息中间件学习总结(11)——Kafka与RocketMQ的Topic数量对单机性能的影响比较分析

    引言 上一期我们对比了三类消息产品(Kafka.RabbitMQ.RocketMQ)单纯发送小消息的性能,受到了程序猿们的广泛关注,其中大家对这种单纯的发送场景感到并不过瘾,因为没有任何一个网站的业务 ...

  4. RocketMQ 中Topic、Tag、GroupName基本概念介绍

    本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...

  5. [Azure][Event hub]Kafka无法同时连接到同一个namespace下的两个Event hub

    1.问题背景 有一个应用需要用kafka消费event hub的消息,其中两个kafka consumer同时连接到了同一个namespace下的两个event hub. kafka:consumer ...

  6. IntelliJ IDEA导入多个eclipse项目到同一个workspace下

    IntelliJ IDEA 与eclipse在新建项目上工作区的叫法略有不同,区别见下图. 我们在eclipse都是在新建的workspace目录下新建我们的项目,但是在IDEA中没有workspac ...

  7. php只显示一部分文章,typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章...

    typecho同一个页面下调用不同分类的文章但是却只显示一个分类文章 作者:佚名 来源:爱好者 时间:2018-04-30 问题描述: 同页面调用分类下文章,只显示一第一个分类下的文章 在一个页面中, ...

  8. 启动mq命令 linux,RocketMQ:Linux下启动server和broker的命令

    目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...

  9. RocketMQ:Linux下启动server和broker的命令

    目录 QUESTION:RocketMQ:Linux下启动server和broker的命令? ANSWER: 一.启动mqnamesrv 1.1当前执行 1.2后台运行 二.启动mqbroker 2. ...

最新文章

  1. Hive导出复杂数据到csv文件
  2. asp.net core 系列 18 web服务器实现
  3. 【Python】Matplotlib太臃肿,试试Seaborn
  4. Metasploit Framework(1)基本命令、简单使用
  5. 项目部署到tomcat6.0启动成功后访问页面报500_.net core IIS部署教程
  6. vue项目中eslint检查警告——“Trailing spaces not allowed”
  7. OpenShift 4 之脚本化部署Istio的HelloWorld和BookInfo示例
  8. 基于容器实现高并发网站
  9. 软件设计模式之单例模式
  10. python网络爬虫(一):网络爬虫科普与URL含义
  11. 报表 labview_干货!24个LabVIEW初学者常见问题及解答汇总
  12. Linux下DNS服务器的基本搭建
  13. ESP8266热点配网-Arduino代码分享
  14. SOMEIP报文格式部分字段概述(二)
  15. (十一)GDBdebug调试技术——malloc()和free()发生故障
  16. 零碎记录- spring security oauth2 资源服务器中设置放行路径
  17. 当红小生酒店施暴性感女星
  18. Jumserver安装日志审计和资产管理
  19. 结合运动流的时间先验在微创手术视频中的器械分割
  20. python日常实用小脚本-用Python编写渗透用小脚本 短小实用

热门文章

  1. 中企故事汇:马可波罗三生三世的故事
  2. VxWorks错误码查找表
  3. untu复制粘贴快捷键
  4. netty报Unknown channel option ‘TCP_NODELAY‘ for channel异常
  5. 互动媒体尝试之P5创意绘图板
  6. 苹果cms微信对接php,苹果cmsV10微信公众号对接插件安装教程
  7. 码商的末日:提供个人收款码的“兼职”也被警方抓了!涉嫌共同犯罪
  8. 2017 上半年我整理的好文章(上)(98 篇)
  9. 微格式(microformat)
  10. Windows系统中设置软件的开机自动启动