何为CMQ?

腾讯云消息队列(Cloud Message Queue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的收发消息,存储在可靠有效的 CMQ 队列中,防止消息丢失。 CMQ 支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。——来源以及更多内容推荐看官方文档。

之前公司内部使用rabbitMQ,但是运维调整部署全部迁移到腾讯云上,如果继续使用rabbitMQ,还需要运维自主去搭建环境,维护之类,而且经考察对rabbitMQ维护成本相比直接使用腾讯云的CQM高很多,所以最近技术部门对CMQ进行研究发现基本可以替代rabbitMQ,但是同时也发现一个比较严重的问题,使用cmq的mq功能,无法实现完全实现自动触发消息消费,因为cmq的消息监听基于长连接的,长时间没有消息推送会造成长连接断开,无法实现自动触发消息消费了。本文目的主要解决CQM自动触发消息消费。

利用spring中可以根据注解获取bean,调用对应通知方法,实现多线程自动拉取消息。

自定义注解Queue

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface IzkQueue {String value() default "";String queueName() default "";
}

消息处理器抽象统一接口

/**
* 消息处理器抽象统一接口
*/
public interface IBaseCmqHandler {/*** 处理从cmq中获取的消息** @param queueName : 队列名* @param message   : 消息体* @return*/boolean onMessage(String queueName, Message message);
}

CMQ消息监听类

@Slf4j
@Component
public class CmqListener implements ApplicationContextAware, ApplicationListener<ApplicationEvent> {@Setterprivate ApplicationContext applicationContext;@Autowiredprivate TaskExecutor taskExecutor;private boolean isStart = false;/*** 获取所有的需要监听mq的类,以及注册的mq* @param applicationEvent*/@Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {HashMap<String, IBaseCmqHandler> map = new HashMap<>(16);Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(IzkQueue.class);beanMap.forEach((key, value) -> {IzkQueue annotation = value.getClass().getAnnotation(IzkQueue.class);String queue = annotation.queueName();map.put(queue, (IBaseCmqHandler) value);});if (!isStart) {isStart = true;if (!CollectionUtils.isEmpty(map)) {taskExecutor.execute(() -> executeQueueHandler(map));}}}private void executeQueueHandler(HashMap<String, IBaseCmqHandler> map) {map.forEach((queueName, bean) -> {taskExecutor.execute(() -> receiveCmqMessage(queueName, bean));});}/*** 功能描述 : 将队列与对应的消息处理器进行匹配,并进行消息消费** @param queueName  : queue name* @param cmqHandler : 具体的消息处理器* @return* @created 2019-07-14 16:55*/private void receiveCmqMessage(String queueName, IBaseCmqHandler cmqHandler) {try {while (true) {// 睡眠 释放cpu资源Thread.sleep(10);CmqQueue cmqQueue = applicationContext.getBean(queueName, CmqQueue.class);Message message = cmqQueue.receiveMsg();if (null != message) {log.info("时间:{}, 队列:{}, 收到消息:{}", LocalDateTime.now(), queueName, message.msgBody);if (!StringUtils.isEmpty(message.msgBody) && !StringUtils.isEmpty(message.receiptHandle)) {taskExecutor.execute(() -> {try {// 处理消息if (cmqHandler.onMessage(queueName, message)) {// 消费成功 删除消息cmqQueue.deleteMsg(message.receiptHandle);} else {taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));}} catch (Exception e) {log.error("消息处理失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));}});}}}} catch (Exception e) {log.error("消息执行失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));}}}

关于上述涉及到类CmqQueue是公司内部封装类,将queue队列和cmq的账号绑定,只是大概展示一下,仅供参考。

账号信息类

@Data
public class MqAccount {private String host;private String port;private String username;private String password;private String vhost;private String secretId;private String secretKey;private String endpoint;private String queueEndpoint;
}

CmqQueue的信息类

public class CmqQueue extends AbstractMq {private static final Logger LOGGER = LoggerFactory.getLogger(CmqQueue.class);private Account account;private Queue queue;public CmqQueue(MqAccount mqAccount, String queueName) {mqAccount = (MqAccount)Preconditions.checkNotNull(mqAccount);Preconditions.checkNotNull(queueName);queueName = this.getNameWithSuffix(queueName);this.init(mqAccount, queueName);}private void init(MqAccount mqAccount, String queueName) {this.account = new Account(mqAccount.getQueueEndpoint(), mqAccount.getSecretId(), mqAccount.getSecretKey());ArrayList list = Lists.newArrayList();try {this.account.listQueue(queueName, -1, -1, list);long count = list.stream().filter((name) -> {return queueName.equalsIgnoreCase(name);}).count();if (count == 0L) {QueueMeta meta = new QueueMeta();this.account.createQueue(queueName, meta);} else {LOGGER.warn("cmq queueName  {}  has exist", queueName);}this.queue = this.account.getQueue(queueName);} catch (Exception var7) {LOGGER.error("cmq createQueue error", var7);throw new RuntimeException(var7);}}public void setQueueAttr(QueueMeta meta) {try {this.queue.setQueueAttributes(meta);} catch (Exception var3) {LOGGER.error("cmq setQueueAttr error", var3);}}public String sendMsg(String msg) {try {return this.queue.sendMessage(msg);} catch (Exception var3) {LOGGER.error("cmq queuename:{},sendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});return null;}}public List<String> batchSendMsg(List<String> msgs) {try {return this.queue.batchSendMessage(msgs);} catch (Exception var3) {LOGGER.error("cmq queuename:{},batchSendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});return null;}}public Message receiveMsg() {Message message = null;try {message = this.queue.receiveMessage(10);} catch (Exception var3) {LOGGER.error("cmq queuename:{},receiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});}return message;}public List<Message> batchReceiveMsg(int numOfMsg) {try {return this.queue.batchReceiveMessage(numOfMsg, 10);} catch (Exception var3) {LOGGER.error("cmq queuename:{},batchReceiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});return null;}}public void deleteMsg(String receiHandle) {try {this.queue.deleteMessage(receiHandle);} catch (Exception var3) {LOGGER.error("cmq queuename:{},deleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});}}public void batchDeleteMsg(List<String> receiHandles) {try {this.queue.batchDeleteMessage(receiHandles);} catch (Exception var3) {LOGGER.error("cmq queuename:{},batchDeleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});}}
}public abstract class AbstractMq {protected String exchangeName;protected String exchangeType = "topic";public AbstractMq() {}protected String getExchangeType() {return this.exchangeType;}protected String getNameWithSuffix(String name) {return !DeveloperUtil.isLocalDebug() ? name + "_" + Util.runEvn : name + "_local";}
}

Demo案例

@IzkQueue(queueName = "queueDemo",value = "demo")
public class MessageDemo implements IBaseCmqHandler {@Overridepublic boolean onMessage(String queueName, Message message) {//todoreturn false;}
}

总结

不将就是发现的原动力,多思考多动手。

CMQ——多线程实现自动拉取消息相关推荐

  1. jenkins 通过自动拉取Gitlab上的代码实现自动更新NGINX

    所需要用到的环境: Gitlab: 172.20.7.70Jenkins: 172.20.7.71nginx: 172.20.7.72 gitlab 和Jenkins安装自行百度 开始实验操作 首先通 ...

  2. 【mq】从零开始实现 mq-09-消费者拉取消息 pull message

    前景回顾 [mq]从零开始实现 mq-01-生产者.消费者启动 [mq]从零开始实现 mq-02-如何实现生产者调用消费者? [mq]从零开始实现 mq-03-引入 broker 中间人 [mq]从零 ...

  3. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

  4. RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...

  5. java 实现自动生成部署文档_jenkins的部署、实现自动拉取gitlab仓库代码、实现项目中代码自动部署以及项目关联触发......

    jenkins主机内存和gitlab主机内存最好配置4G及以上,防止各自的web端打不开 1.配置JDK环境 1)jdk解压到此目录 [root@localhost src]# pwd /usr/lo ...

  6. rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问

    作者:mousycoder segmentfault.com/a/1190000021054802 消息队列连环炮 项目里怎么样使用 MQ 的? 为什么要使用消息队列? 消息队列有什么优点和缺点? k ...

  7. kubernetes实战篇之创建密钥自动拉取私服镜像

    系列目录 前面我们讲解了如何搭建nexus服务器,以及如何使用nexus搭建docker私有镜像仓库,示例中我们都是手动docker login登陆私服,然后通过命令拉取镜像然后运行容器.然而这种做法 ...

  8. vue项目中自动拉取更新Iconfont(阿里巴巴图标库)

    在vue项目中使用 iconfont图标库,网上的栗子很多,这边就随手给一个,点这里 上面的解决了,那我就很苦恼,我每次添加 或删除 或更新图标库,需要重新下载?自己手动去覆盖吗?我是拒绝的.so,自 ...

  9. 通过宝塔webhook,实现git自动拉取服务器代码

    1.宝塔安装webhook,添加一条记录,脚本内容为: #!/bin/bash echo "" #输出当前时间 date --date='0 days ago' "+%Y ...

最新文章

  1. 阿里云kafka使用记录(python版本)
  2. 为文本添加风格text-decoration
  3. Javascript倒计时页面跳转
  4. SqlCommandBuilder自动创建dataAdapter数据库操作命令
  5. GoAhead2.5源代码分析之10-web server主程序(main.c)
  6. MyBatis3教程
  7. 创建font_年底干货来了!图案创建、字体、图库、UI套件常见工具大合集!
  8. ENVI5.3下载和安装
  9. Linux awk 中 BEGIN 和 END 的使用方法
  10. 深度解密Go语言之map
  11. 计算机算法实际应用,数学方法在计算机算法中的应用分析
  12. 完美国际真数苹果_让苹果数据线下岗的两款数据线!剪断了还能用!
  13. 招聘计算机教师面试自我介绍,[教师招聘面试自我介绍]怎么在教师招聘时自我介绍...
  14. html a字体字号,A标签字体大小css布局实例教程
  15. Java中多态的粗浅见解
  16. Python格式化字符串的4种方式
  17. cocos2d-LUA逆向之用idaPro调试so库获取xxtea解密key
  18. 直击|为防虚假信息 百合佳缘引入第三方征信查询合作
  19. RGB、YUY2、YUYV、YVYU、UYVY、AYUV格式详解
  20. Linux之开机自动运行脚本

热门文章

  1. Ubuntu 账户管理
  2. clistctrl获取列高 mfc_VC MFC列表视图(CListCtrl)控件
  3. js promise 详解
  4. 高速公路预制梁场建设案例(附方案文本)
  5. SCL工具之CID合并SCD,SCD拆分CID
  6. 企业微信群活码是什么?如何用企业微信群活码实现自动分流
  7. 细数STM32F103的那些坑
  8. 安防知识整理_视频监控_《玩转IP看监控》1
  9. 安装到树莓派c语言编程ide,【玩树莓】编程篇(四)在树莓派2上运行Cloud9 IDE服务器...
  10. c语言的前置课程,专业课前置课程一览表.pdf