以下只针对集群模式:
1 producer
默认情况下不需要设置instanceName,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
如果同一个jvm中,不同的producer需要往不同的rocketmq集群发送消息,需要设置不同的instanceName
原因如下:如果不设置instanceName,那么会使用ip@pid作为producer唯一标识,那么会导致多个producer内部只有一个MQClientInstance(与mq交互)实例,从而导致只往一个集群发消息。
2 consumer
默认情况下不需要设置instanceName,rocketmq会使用ip@pid作为instanceName(pid代表jvm名字)
如果设置instanceName,rocketmq会使用ip@instanceName作为consumer的唯一标示,此时需要注意instanceName需要不同。
3 consumer设置上instanceName后,无法集群消费的问题调查
应用场景:

一台机器上的多个consumer jvm进程消费整个集群的消息

问题说明:

由于集群模式下我们希望consumer能够平均消费整个集群的消息,但是设置上instanceName后,发现每个consumer都消费整个集群的消息。

查看开发指南,该参数说明如下:

参数名
默认值
说明
instanceName    DEFAULT    
客户端实例名称,客户端创建的多个 Producer、
Consumer 实际是共用一个内部实例(这个实例包含
网络连接、线程资源等)

问题调查:

一开始怀疑是cosumer的MessageModel设置为广播消费的原因导致的:

广播消费:一条消息被多个 Consumer 消费, 即使返些 Consumer 属亍同一个 Consumer Group,消息也会被 Consumer 
Group 中的每个 Consumer 都消费一次

但是DefaultMQPushConsumer中MessageModel默认就是集群模式,故排除。

后来经过走查DefaultMQPushConsumer源码发现是由于rebalance策略问题导致的,consumer的rebalance会在首次启动时和之后每10秒一次,做rebalance操作,代码调用链如下:

首次启动rebalance代码如下:

DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.rebalanceImmediately -> RebalanceService.wakeup

之后每10秒一次的代码如下:

DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.start -> RebalanceService.start,启动rebalance线程

查看DefaultMQPushConsumer的rebalance策略,默认是AllocateMessageQueueAveragely,该策略是均衡消息队列到consumer,既然有rebalance,那为何没有做到平衡呢?

继续跟踪源码:

RebalanceService.start -> MQClientInstance.doRebalance -> DefaultMQPushConsumerImpl.doRebalance -> RebalanceImpl.doRebalance -> RebalanceImpl.rebalanceByTopic -> AllocateMessageQueueAveragely.allocate 此时才到真正的rebalance。

该方法参数说明如下:

/**
  * Allocating by consumer id
  *
  * @param consumerGroup current consumer group
  * @param currentCID    current consumer id
  * @param mqAll         message queue set in current topic
  * @param cidAll        consumer set in current consumer group
  * @return
  */
 publicList allocate(String consumerGroup,String currentCID,List mqAll,List cidAll);
通过debug allocate方法发现,第二个参数需要MQClientInstance.clientId,这个由ClientConfig.buildMQClientId产生,产生规则是ip@instanceName

而instanceName即为consumer设置的,如果设置上这个参数,启动多个jvm进程,则currentCID都一样,而计算rebalance时如下代码导致每次将所有的queue分配到一个consumer上:

int index = cidAll.indexOf(currentCID);

假如不设置instanceName,ClientConfig.changeInstanceNameToPID会获取RuntimeMXBean.getName作为instanceName,而这个值对于多个jvm是不一样的,api解释如下:

RuntimeMXBean.getName:返回表示正在运行的 Java 虚拟机的名称。返回的名称字符串可以为任何任意字符串,Java 虚拟机实现可以选择在返回的名称字符串中嵌入特定于平台的有用信息。每个正在的运行的虚拟机可以具有不同的名称。

这样rebalance时就会平均分配到consumer上。
————————————————
版权声明:本文为CSDN博主「MQCloud」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/a417930422/article/details/50663629

rocketmq设置instanceName相关推荐

  1. rocketmq设置端口号_nginx代理rocketmq端口

    1.环境说明 系统:centos7 因为办公电脑和搭建rocketmq服务的服务器不在一个网段,而且办公电脑业无法访问该服务器除了80和8080以外的其他端口,所以要对rocketmq的9876端口做 ...

  2. RocketMQ生产者流程篇

    系列文章 RocketMQ入门篇 RocketMQ生产者流程篇 RocketMQ生产者消息篇 前言 生产者向消息队列里面写入消息,不同的业务场景会采用不同的写入策略,比如:同步发送,异步发送,延迟发送 ...

  3. RocketMQ原理剖析

    前言 MQ使用场景 异步.解耦.削峰填谷 MQ选型 吞吐量: Kafka具有更高的吞吐量.Kafka在Producer端将多个小消息合并,批量发送给Broker,从而提高系统的吞吐量.同时,Kafka ...

  4. SpringBoot优雅整合RocketMQ

    SpringBoot优雅整合RocketMQ 本篇文章默认你已经有RocketMQ的基础: Producer启动过程,消息发送过程 Consumer启动过程,消息拉取消息消费过程 NameServer ...

  5. 30分钟搞懂 RocketMQ原理

    概述 消息队列作用 应用解耦:对系统之间的交互使用消息队列,降低系统之间的耦合. 流量消峰:利用消息队列进行缓存,使短时高并发的任务,可以分散在一段时间内进行处理. 消息分发:将数据写入消息队列,供个 ...

  6. RocketMQ 简单原理

    早期的消息中间件是通过  队列  这一模型来实现的,可能是历史原因,我们都习惯把消息中间件成为消息队列. 但是,如今例如  RocketMQ  . Kafka  这些优秀的消息中间件不仅仅是通过一个  ...

  7. RocketMQ读写队列

    读写队列作用 RocketMQ创建Topic时,可以配置writeQueueNums和readQueueNums,读写队列,是在做路由信息时使用.在消息发送时,使用写队列个数返回路由信息,而消息消费时 ...

  8. 消息中间件RocketMQ

    消息中间件RocketMQ   RocketMQ 是阿里巴巴开源的分布式消息中间件.支持事务消息.顺序消息.批量消息.延时消息.消息回溯等.它里面有几个区别于标准消息中件间的概念,如Group.Top ...

  9. RocketMQ 中Topic、Tag、GroupName基本概念介绍

    本文主要介绍RocketMQ中Topic.Tag.GroupName的概念.设计初衷以及使用方法. 一.Topic 首先看看官方的定义: Topic是生产者在发送消息和消费者在拉取消息的类别.Topi ...

最新文章

  1. [导入]实时数据库的经典书
  2. python的pptx文档_通过python-pptx模块操作ppt文件
  3. 云视频会议的“多、快、好、省”(下)
  4. 16年寒假随笔(3)
  5. C++数组与指针回顾总结
  6. idea配置远程服务器实现远程编辑文件及ssh连接
  7. boost::log模块实现多线程异步日志记录示例
  8. tstringlist怎么查看是否存在该数据_财务报表审计该如何进行?
  9. 总帐明细账对账不平数据库修改
  10. Python微调文本顺序对抗朴素贝叶斯算法垃圾邮件分类机制
  11. .NET 6 Preview 4 已发布,这些新功能值得关注!
  12. Centos6.3安装KVM
  13. 基本信息项目目标文档
  14. 第一篇:一个win32控制台程序
  15. 机器人机构学基础(朱大昌)第二章部分习题答案
  16. 机器视觉最常见的五大典型应用
  17. PS磨皮—高低频磨皮
  18. 使用UltraISO制作光盘镜像
  19. 广告标示符(adId)  adfv标示符的那些问题
  20. Visual Studio中的rc是什么文件

热门文章

  1. 【LabVIEW信号处理】加速度信号到速度信号(1)
  2. 一个简单的智能停车APP——抽屉式侧边栏
  3. 《C语言程序设计》实训报告——学生成绩管理系统
  4. 金三银四 “狂飙” 季,一波综合面试题来了
  5. Java入门小练习:根据年份输出对应的生肖
  6. 3.1.10stateflow图形函数Graphical function
  7. [似水流年]课下散心登老和山有感
  8. 安全测试之不安全的加密存储
  9. 10天手敲一个SpringBoot网上商城项目(四)——新增收货地址功能、获取省市区列表及名称功能的实现
  10. 0023--softonic--免费软件下载