1.在网上看了一些解决这个问题的办法,大部分朋友都说是要在实例化 DefaultMQProducer 的时候指定惟一的 instanceName 来解决,窃以为这样虽然解决了问题,但却是不应该用的解决办法。为什么这样说?因为官网介绍客户端公共参数的时候对这个instanceName有明确的说明

instanceName DEFAULT

客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)

所以,这个 instanceName 所标识的实例会同时创建自己的网络连接,线程资源,如果每次创建一个 Producer 都指定不同的 instanceName 这样就会 浪费 更多资源 比如内存和线程,网络IO。还会降低消息处理的效率。按照说明,应该是尽可能多个Producer共用一个instanceName 才合理。

2.另外,题目上的报错,是因为 group 已被创建,为什么要用指定不同且唯一的 instanceName 来解决呢?不能因为这样能解决就这样解决。实际上,如果用  DefaultMQProducer 来实例 producer 则会把创建好的producer先放到一个  producerTable

ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

中,代码中的方法是

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

这个方法里 关键地方是

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

在添加的时候如果发以group为键的producer已存在,则注册失败。这里的键是group所以,当我们已经创建了同group的producer时,如果这个 producer没有shutdown,则再次以同样的group创建producer的时候就会报题目中的错误。

而shutdown之后之所以不报错是因为,shutdown这个方法本身调用 的是  unregisterProducer(String group) 在类 MQClientInstance 中。这个方法是包含从 producerTable 中把已添加的producer先移除,然后再shutdown的。具体代码是下面这样的

 
  1. this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
  2. this.defaultAsyncSenderExecutor.shutdown();

所以先 调用 DefaultMQProducer shutdown 之后再创建新的同group的producer是不会报错的。

3.我们再看为什么每次用   DefaultMQProducer 来创建 producer的时候如果 都设置不同的instanceName为什么也不会报错呢?这是因为如果设置的instanceName是唯一的。则在注册producer之前,如果设置的group 不是默认的,则每次 获取的mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory类里的一个属性,这样当然producerTable也是不同的,这样注册producer当然是注册到不同的producerTable中去了,所以不会报错。

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();
}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

但是,这种解决办法是不可取的。因为instanceName是一个比较重(隔离数据多,创建耗时长,消费资源多)的参数。

4.那么我们怎么更好的解决这个问题呢?我们可以参考源码中 logappender 模块中 的 ProducerInstance 类来实现。这个类在源码中位于 org.apache.rocketmq.logappender.common 下面

下面是这个类的源码

public class ProducerInstance {public static final String APPENDER_TYPE = "APPENDER_TYPE";public static final String LOG4J_APPENDER = "LOG4J_APPENDER";public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";public static final String DEFAULT_GROUP = "rocketmq_appender";private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();private static ProducerInstance instance = new ProducerInstance();public static ProducerInstance getProducerInstance() {return instance;}/**根据 nameServerAddress 和 group 生成 producer 在 producerMap 中的键**/ private String genKey(String nameServerAddress, String group) {return nameServerAddress + "_" + group;}/**
根据 nameServerAddress 和 group 获取已注册到producerMap中的producer,如果不存在,则调用 DefaultMQProducer 生成新的producer注册并返回
**/ public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {if (StringUtils.isBlank(group)) {group = DEFAULT_GROUP;}String genKey = genKey(nameServerAddress, group);MQProducer p = getProducerInstance().producerMap.get(genKey);if (p != null) {return p;}DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);defaultMQProducer.setNamesrvAddr(nameServerAddress);MQProducer beforeProducer = null;beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);if (beforeProducer != null) {return beforeProducer;}defaultMQProducer.start();return defaultMQProducer;}/**
根据 nameServerAddress 和 group 移除已注册到producerMap中的producer,同时shutdown
**/ public void removeAndClose(String nameServerAddress, String group) {if (group == null) {group = DEFAULT_GROUP;}String genKey = genKey(nameServerAddress, group);MQProducer producer = getProducerInstance().producerMap.remove(genKey);if (producer != null) {producer.shutdown();}}/**
移除 producerMap 中所有的 producer 并全部关闭。
**/public void closeAll() {Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();for (Map.Entry<String, MQProducer> entry : entries) {getProducerInstance().producerMap.remove(entry.getKey());entry.getValue().shutdown();}}}

可以把这个类直接复制到要使用的项目中,然后在要使用指定 nameServerAddress 和 group 的 producer 时,直接用下面的方法获取一个。

MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group");
/*
自己生成message消息,然后下面发送
*/
producer.send(message);
/*
如果是比较频繁使用的producer,发送完消息后不用关闭和移除下次再用的时候可以直接再获取拿来就可以发送消息。
对于确定要隔比较长时间不用的producer,可以用下面的方法 移除并关闭
*/
ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");

我们会发现,这个类获取 producer 实例的时候只用了 nameServerAddress 和 group 这两个参数。如果我们确实需要操作不同的 instanceName 下的 producer 时,该怎么办呢?直接改造 这个类里的方法,添加上

instanceName 参数 即可。

加参数后的类如下,使用方式没什么差别只是多了个参数而已。

public class ProducerInstance {public static final String APPENDER_TYPE = "APPENDER_TYPE";public static final String LOG4J_APPENDER = "LOG4J_APPENDER";public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";public static final String DEFAULT_GROUP = "rocketmq_appender";private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();private static ProducerInstance instance = new ProducerInstance();public static ProducerInstance getProducerInstance() {return instance;}private String genKey(String nameServerAddress, String group,String instanceName) {return nameServerAddress + "_" + group + "_" + instanceName;}public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException {if (StringUtils.isBlank(group)) {group = DEFAULT_GROUP;}if (StringUtils.isBlank(instanceName)) { instanceName = "DEFAULT"; }String genKey = genKey(nameServerAddress, group, instanceName);MQProducer p = getProducerInstance().producerMap.get(genKey);if (p != null) {return p;}DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);defaultMQProducer.setNamesrvAddr(nameServerAddress);defaultMQProducer.setInstanceName(instanceName);MQProducer beforeProducer = null;beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);if (beforeProducer != null) {return beforeProducer;}defaultMQProducer.start();return defaultMQProducer;}public void removeAndClose(String nameServerAddress, String group, String instanceName) {if (group == null) {group = DEFAULT_GROUP;}if (StringUtils.isBlank(instanceName)) {instanceName = "DEFAULT";}String genKey = genKey(nameServerAddress, group,instanceName);MQProducer producer = getProducerInstance().producerMap.remove(genKey);if (producer != null) {producer.shutdown();}}public void closeAll() {Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();for (Map.Entry<String, MQProducer> entry : entries) {getProducerInstance().producerMap.remove(entry.getKey());entry.getValue().shutdown();}}}

关于 RocketMQ:The producer group has been created before, specify another name please.这个报错的解决办法...相关推荐

  1. RocketMQ:The producer group has been created before, specify another name please.

    RocketMQ新创建生产者时报异常: The producer group[ ] has been created before, specify another name please. 发送消息 ...

  2. MySQL报错1055解决办法:[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains

    [mysql报错1055 报错解决办法][Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and cont ...

  3. The producer group has been created before

    为什么80%的码农都做不了架构师?>>>    编写一个help类 public class RunTimeUtil {private static AtomicInteger in ...

  4. sqlserver查询补全时间_mssql 按日期分组(group by)查询统计的时候,没有数据补0的解决办法...

    摘要: 下文讲述一次报表制作的需求, 需制作一个月的销量的数据汇总,如果其中某一天没有数据,那么就补0处理 例: /* 统计2018-4月份的销量统计, 无数据的天补0 */ ---建立基础数据 cr ...

  5. Apache RocketMQ 安装、测试、报错解决

    1. 准备 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 64bit OS, Linux/Unix/Mac 64bit JDK 1.8+; Mav ...

  6. 一个NVIDIA驱动安装报错——ERROR: The nvidia kernel module was not created.

    Ubuntu18系统下,在安装cuda及nvidia驱动时,安装失败,查看日志显示"ERROR: The nvidia kernel module was not created." ...

  7. RocketMQ特性、专业术语(Producer,Producer Group,Consumer Group,Topic,Message,Tag,Broker,Name Server)等

    RocketMQ 是什么 Github 上关于 RocketMQ 的介绍:RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点 ...

  8. The consumer group has been created before, specify another name please RocketMQ异常

    解决方案一: rocketmq 报 The consumer group has been created before, specify another name please. 错误 是因为一个s ...

  9. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

最新文章

  1. 在python中调用js或者nodejs要使用PyExecJs第三方包。
  2. 实战教程 | 车道线检测项目实战,霍夫变换 新方法 Spatial CNN
  3. linux中sh基本语法
  4. OpenCASCADE:使用 扩展数据交换XDE之形状和组件
  5. 改变照片分辨率的软件_AI黑科技竟如此强大,模糊照片无损放大600%变得更清晰!...
  6. voc数据集的map计算方式
  7. Linux嵌入式开发_主设备号与次设备号详解
  8. Json对象直接存取数据库
  9. 计算机能否代替老师英语作文,雅思大作文范文:电脑不可取代老师
  10. 4019 设备树 Linux device tree 概述
  11. Windows 下载安装 SonarQube和使用
  12. Win10蓝屏原因分析记录
  13. 学习总结:即时通讯项目里面的语音处理-文件模式录音
  14. SRE工作手册——基础
  15. 【流媒体开发】【数据与封装格式】20、AAC码流格式与解析
  16. 学习笔记(2):A110测试-测试课程申请22
  17. 从零开始的Node.js新闻爬虫实验项目(四)东方财富网、网易新闻、Pixiv的爬取思路
  18. 英特尔2018年处理器一览
  19. Sunshine数据库篇之查询
  20. JavaWeb实现餐厅点餐系统

热门文章

  1. 笔记本喇叭无声音解决方案
  2. [易语言] 六边形扫雷游戏实战开发
  3. NDIS小端口驱动ndisEdge学习二——小端口驱动的初始化
  4. 跨境电商的痛点有哪些?
  5. gps定位器更换平台指令-GPS定位器接入平台指令
  6. Unite 2017 Shanghai 四大技术专场全面解锁
  7. 2021年中国智能驾驶行业研究报告
  8. RecentsActivity启动分析二
  9. linux grep本地ip,linux grep怎么查ip地址
  10. 为有梦青年插上助力的翅膀 魅族开发者大赛取得圆满成功