mq常见问题:消息丢失、消息重复消费、消息保证顺序

消息丢失问题

拿rabbitmq举例来说,出现消息丢失的场景如下图

从图中可以看到一共有以下三种可能出现消息丢失的情况:

1> 生产者丢消息
生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败

2>MQ自身丢消息
未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失;
开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小

3>消费者弄丢了消息
消费者刚接收到消息还没处理完成,结果消费者挂掉了…

针对以上三种情况,每种情况都有对应的处理方法:

1》生产者弄丢消息的解决方法
方法一:开启RabbitMQ的事务

rabbitmq提供了与三个事务相关的命令:select开启事务、commit提交事务、rollback回滚事务
采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。

方法二:开启confirm模式

使用springboot时在application.yml配置文件中做如下配置spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest# 发送者开启 confirm 确认机制publisher-confirm-type: correlated实现confirm回调接口@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("消息发送异常!");//可以进行重发等操作} else {log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);}}
}生产者发送消息时设置confirm回调@Slf4j
@Configuration
public class RabbitMqConfig {@Beanpublic ConfirmCallbackService confirmCallbackService() {return new ConfirmCallbackService();}@Beanpublic RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);/*** 消费者确认收到消息后,手动ack回执回调处理*/rabbitTemplate.setConfirmCallback(confirmCallbackService());return rabbitTemplate;}//其他配置代码......

小结: 事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

2》MQ自身弄丢消息时的解决方法
使用持久化队列,发送消息时做持久化到磁盘处理。
同时设置queue和message持久化以后,RabbitMQ 挂了再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据,保证数据不会丢失。

但是就算开启持久化机制,也有可能出现消息落盘时服务挂掉的情况。这时可以考虑结合生产者的confirm机制来处理,持久化机制开启后消息只有成功落盘时才会通过confirm回调通知生产者,所以可以考虑生产者在生产消息时维护一个正在等待消息发送确认的队列,如果超过一定时间还没从confirm中收到对应消息的反馈,自动进行重发处理。

3》消费者自身弄丢消息时的解决方法
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费,不过我们只需要保证幂等性就好了,重复消费也不会造成问题。

在springboot中修改application.yml配置文件更改为手动ack模式

spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: truelistener:simple:concurrency: 10max-concurrency: 10prefetch: 1auto-startup: truedefault-requeue-rejected: true# 设置消费端手动 ackacknowledge-mode: manual# 是否支持重试retry:enabled: true消费端手动ack参考代码:@RabbitHandlerpublic void handlerMq(String msg, Channel channel, Message message) throws IOException {try {//业务处理代码......//手动ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...", e);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...", e);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}

消息重复消费问题

出现消息重复消费的情况

原因一
1、生产者发送给消息队列以后,消息队列会应达给生产者,但是这个过程中,消息队列出问题了没有收到消息,那么生产者就会重复发生消息,这时就产生了重复消息。
2、生产者发生消息给消息队列,消息队列由于数量太大延迟了,生产者等待响应超时了,这时生产者又会从新发生消息给消息队列。
3、生产者和消息队列因网络问题引起,生产者会发起重试。这样也会产生重复消息。
4、其实主要原因就是,消息成功进入了消息队列,但是由于各种原因消息队列没有给生产者成功的返回值,而生产者又有重试机制这种情况下就会产生重复消息。

原因二
1、消息队列推送给消费者,消费者处理消息这个过程中消费出现了问题,消息队列不知道消费者处理结果,就会在次投递。
2、消费者处理完,网络出现问题,这时没有给中间件消息队列返回结果,消息队列会在次投递消费者。
3、消费者处理超时,超过了消息队列的超时时间,这时消息队列也会再次投递。
4、消费者处理完结果返回给消息中间件,但是消息中间件出现问题,处理结果丢失了,重启后,消息中间件内部检查发现这个消息还没有处理也会在次投递给消费者。

针对该问题,一般是在消费者端做幂等处理。

如何保证消息队列消费的幂等性

这一块应该还是要结合业务来选择合适的方法,有以下几个方案:

消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。

消息保证顺序消费

消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成。

出现消费顺序错乱的情况

为了提高处理效率,一个queue存在多个consumer

一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理

保证消息顺序性的方法

将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。

一个queue就一个consumer,在consumer中维护多个内存队列,根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。

RabbitMQ保证消息顺序性总结:
核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。

本文总结:

消息丢失:

生产者 建议使用异步confirm(非高并发需求情况下也可以考虑rabbitmq的事务机制)

mq:做消息持久化

消费者:关闭自动提交offset/自动ack  使用手动处理

重复消费:很容易解决,建立去重表,做幂等处理

如何保证有序:

多个queue, 每个queue都有一个自己的consumer,将一类消息投递到一个queue中

消费者维护内存队列,同一类消息hash到一个内存队列中

参考文章:https://blog.csdn.net/zw791029369/article/details/109561457

MQ常见问题及解决方案相关推荐

  1. 消息队列常见问题和解决方案

    一.为什么使用消息队列? 消息队列使用的场景和中间件有很多,但解决的核心问题主要是:异步.解耦.消峰填谷. 二.消息队列的优缺点 异步.解耦.消峰填谷这是消息队列最大的优点,除了这些消息队列还可以会解 ...

  2. OpenStack环境搭建(六:常见问题及解决方案总结)

    实验要求: 完成Virtual box平台安装,会应用相关操作: 在virtual box虚拟平台上部署Fuel Master节点: 在virtual box虚拟平台上部署计算节点Computer: ...

  3. ESXi6.5环境搭建(五:常见问题及解决方案实验总结)

    实验目的及要求 完成VMware workstations安装,会应用相关操作: 完成虚拟机中ESXI6.5平台的安装及网络环境配置: 完成VMware vSphere Client 6.0软件在PC ...

  4. C#中使用WCF一些常见问题及解决方案

    C#中使用WCF一些常见问题及解决方案 参考文章: (1)C#中使用WCF一些常见问题及解决方案 (2)https://www.cnblogs.com/52XF/p/3740326.html 备忘一下 ...

  5. gulp几个常见问题及解决方案

    gulp几个常见问题及解决方案 参考文章: (1)gulp几个常见问题及解决方案 (2)https://www.cnblogs.com/hjson/p/10546708.html 备忘一下.

  6. Mycat常见问题与解决方案

    Mycat常见问题与解决方案 参考文章: (1)Mycat常见问题与解决方案 (2)https://www.cnblogs.com/it-deepinmind/p/11913519.html 备忘一下 ...

  7. [持续更新]UnsatisfiedLinkError常见问题及解决方案

    [持续更新]UnsatisfiedLinkError常见问题及解决方案 参考文章: (1)[持续更新]UnsatisfiedLinkError常见问题及解决方案 (2)https://www.cnbl ...

  8. 为什么焊锡老是粘在烙铁头上_自动焊锡机常见问题及解决方案「由力自动化」...

    随着科技的不断进步,一些行业中机器替代人工是避不可挡的一种发展趋势,自动焊锡机因为所能使用的行业广泛相信很多的朋友都有所了解,那么自动焊锡机器人使用时常见问题有哪些呢,该如何解决呢?下面小编来简单的讲 ...

  9. ARKit从入门到精通(11)-ARKit开发常见问题及解决方案

    转载请注明出处:ARKit从入门到精通(11)-ARKit开发常见问题及解决方案 本文主要介绍ARKit开发过程中一些常见问题 1.ARKit框架无法导入问题 2.ARKit运行黑屏或者白屏问题:Un ...

最新文章

  1. ssh scp文件同步(先不搞了)
  2. 台式计算机机箱的作用,如何选择台式电脑机箱?小白安装电脑机箱常识指南
  3. 投资学习网课笔记(part2)--基金第二课
  4. Java 从网络上下载文件
  5. [C++]试一试结构体struct node的构造函数
  6. Python之浅谈运算符
  7. java phantomjs alert_Python+Selenium+PhantomJS脚本中的Javascript警报
  8. REST framework 用户认证源码
  9. 2月5日年初六返沪的情景
  10. Ubuntu快捷方式的描述
  11. 依赖反转原理,IoC容器和依赖注入:第1部分
  12. C++使用命名管道使用进程间通信
  13. 汉字计算机内码是国标码吗,汉字机内码与国标码的差别
  14. python 图片(pil库)将两个图片合成一张
  15. SpringBoot整合Cache缓存技术(二十一)
  16. win7计算机时间同步出错,win7系统电脑时间同步出错的解决方法
  17. 2020找工作更难了?做好这4方面,找到高薪好工作
  18. 内网渗透测试:内网横向移动基础总结
  19. threejs学习笔记:实现导入的动画gltf模型播放动画
  20. python-opencv图片合成视频

热门文章

  1. 弘辽科技:刘强东下“重手”
  2. c语言程序中要用到阶乘,C程序使用递归求数字的阶乘
  3. win7命令启动计算机管理,关于Win7中运行的命令
  4. es 主要内存使用大户
  5. hdu 1170 Balloon Comes!(水题)
  6. PyTorch 图像分类识别(一)定义及加载自己的数据集并可视化
  7. Maven项目配置镜像地址
  8. 小猫爪:i.MX RT1050学习笔记15-FlexSPI-FLASH使用3-KEIL FLASH算法中的使用
  9. Android 性能优化探究,不愧是Alibaba技术官
  10. matlab如何均匀分布,[转载]【MATLAB】高斯分布 均匀分布 以及其他分布 的随机数生成函数...