目录

  • 如何保证幂等性
    • 什么是幂等性
    • 重复消费产生的场景
    • 解决方案
  • 如何保证可靠性
    • 产生原因
    • 解决方案
  • 如何保证顺序性
    • 产生原因
    • 解决方案
  • 参考

如何保证幂等性

如果消息的重复消费对业务有影响,那么就需要对消息进行幂等处理,下面介绍消息幂等的概念、场景和处理方法。

什么是幂等性

在数学和计算机科学中,幂等运算可以多次应用而不改变初始应用后的结果。在消息队列服务中,幂等性用于处理相同消息的重复消费。消费者重复消费一条消息,最终消费结果与初次消费结果相同,重复消费不会对业务系统造成负面影响。

例如:消费者根据扣款信息扣减订单货款,付款金额为100元,但由于网络问题,消息重复发送给消费者。结果就是消息被重复消费,但是只扣了一次货款,订单只有一次100元的扣款记录。该例子在消息消费过程中实现了消息幂等性,扣费满足业务需求。

重复消费产生的场景

在互联网应用中,尤其网络较差的情况下,RabbitMQ消息可能会被重复消费,如果消息的重复消费对业务有影响,可以对消息进行幂等处理,以下场景可能会重复消费消息:

  • 生产者重复向RabbitMQ代理的消息队列发送消息
    消息发送到代理并持久化后,由于网络断开或者客户端崩溃,代理未能回复客户端,导致生产者认为代理没有收到消息而重新发送,结果消费者收到两条具有相同内容和消息ID的消息
  • RabbitMQ代理向消费者重复传递消息
    消息发送给消费者后,由于网络断开等原因,消费者客户端没有向broker返回ACK响应,代理不知道消息是否被消费,为了确保消息至少被消费一次,代理在网络恢复后再次传递消息,结果消费者就收到了两条具有相同内容和消息ID的消息。

解决方案

  • 消费数据只是单纯的写入数据库
    可以在生产消息的时候为每一个消息加一个全局唯一ID,消费数据插入数据库之前根据主键ID判断数据是否存在,或者建立联合主键索引,重复插入时会报错
  • 消费数据只是写入redis中
    不需要处理,因为redis天然具有幂等性
  • 复杂业务情况
    将所有消费过的消息ID存入redis,使用redis进行消费判断,和数据库判断相比更快

如何保证可靠性

产生原因

可靠性是指消息在MQ中传输会发生消息丢失问题,若是涉及金钱相关的业务可能会造成巨大损失,一般发生消息丢失会存在以下三种情况

  • 生产者弄丢了消息
    生产者在将数据发送到MQ时,由于网络原因造成投递失败
  • MQ自身弄丢了消息
    未开启RabbitMQ持久化,数据只存储在内存,当MQ宕机造成数据丢失
  • 消费者弄丢了消息
    消费者接收到消息后还没处理完成就宕机了

解决方案

  • 生产者弄丢了消息
    方法一:生产者发送数据之前开启事务(不推荐,基于同步消息通讯模式,太慢)
    方法二:生产者开启confirm模式
    1.在application.yml开启confirm模式

    spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest# 发送者开启 confirm 确认机制publisher-confirm-type: correlated
    

    2.实现ConfirmCallback回调接口,可自定义消息发送失败的处理逻辑

    @Slf4j
    public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("消息发送异常!");//可以进行重发等操作} else {log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);}}
    }
    

    3.为RabbitTemplate配置回调函数

    @Slf4j
    @Configuration
    public class RabbitMqConfig {@Beanpublic ConfirmCallbackService confirmCallbackService() {return new ConfirmCallbackService();}@Beanpublic RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);/*** 消费者确认收到消息后,手动ack回执回调处理*/rabbitTemplate.setConfirmCallback(confirmCallbackService());return rabbitTemplate;}//其他配置代码......
    }
    
  • MQ自身弄丢了消息
    开启broker持久化功能
    1.创建queue时设置为持久化队列

    @Bean(QUEUE_IOT_TOIN)
    public Queue createIotQueue() {return new Queue(QUEUE_IOT_TOIN, true);
    }
    

    2.发送消息时将消息的deliveryMode设置为持久化

    public void sendToUploadMsg(Object obj, String routingKey) {try {String jsonString = JSON.toJSONString(obj);rabbitTemplate.convertAndSend(EXCHANGE_IOT, routingKey, jsonString, message -> {//设置该条消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}, new CorrelationData(UUIDUtil.generate()));} catch (Exception e) {log.info(routingKey + "发送消息异常!");}
    }
    
  • 消费者弄丢了消息
    关闭自动ack,使用手动ack,RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。
    1.修改application.yml配置文件更改为手动ack模式

    spring:rabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest# 发送者开启 confirm 确认机制publisher-confirm-type: correlated# 发送者开启 return 确认机制publisher-returns: truelistener:simple:concurrency: 10max-concurrency: 10prefetch: 1auto-startup: truedefault-requeue-rejected: true# 设置消费端手动 ackacknowledge-mode: manual# 是否支持重试retry:enabled: true
    

    2.消费端手动ack参考代码:

    @RabbitHandler
    public void handlerMq(String msg, Channel channel, Message message) throws IOException {try {//业务处理代码......//手动ACKchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...", e);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...", e);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
    }
    

如何保证顺序性

产生原因

在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。

在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除;消费者愣是换了顺序给执行成删除、修改、增加,这样能行吗?肯定是不行的。

对于 RabbitMQ 来说,导致上面顺序错乱的原因通常是消费者是集群部署,不同的消费者消费到了同一订单的不同的消息,如消费者 A 执行了增加,消费者 B 执行了修改,消费者 C 执行了删除,但是消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,就会导致消费 binlog 执行到数据库的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。

如下图是 RabbitMQ 可能出现顺序错乱的问题示意图:

解决方案

RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。

如下图是 RabbitMQ 保证消息顺序性的方案:

参考

https://blog.csdn.net/zw791029369/article/details/109561457
https://xie.infoq.cn/article/c84491a814f99c7b9965732b1

RabbitMQ常见幂等性、可靠性、顺序性问题及解决方案相关推荐

  1. RabbitMQ 如何确保消息的成功投递?幂等性?顺序性?

    RabbitMQ 如何确保消息的成功投递?RabbitMQ 如何保证不重复消费,保证数据不丢失?分布式系统里,如何保证数据的一致性?一串连环炮你是否顶得住? 其实这几个问题的原理大同小异,都可以在统一 ...

  2. 消息队列、RabbitMQ原理、消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题

    消息队列 消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已 常见的消息队列 RabbitMq ActiveM ...

  3. 消息中间件(五)——如何保证消息的顺序性

    当我们的系统中引入了MQ之后,不得不考虑的一个问题是如何保证消息的顺序性,这是一个至关重要的事情,如果顺序错乱了,就会导致数据的不一致. 比如:业务场景是这样的:我们需要根据mysql的binlog日 ...

  4. RabbitMQ 可靠性、重复消费、顺序性、消息积压解决方案

    前言 为什么引入消息队列?引入 MQ 给我们解决了一些问题,但同时又引入了一些复杂的问题,这些问题是大型项目中必须解决的重点,更重要的是,面试也经常问.实际上消息队列可以说是没法百分之百保证可靠性的! ...

  5. 详解,最新整理,RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  6. RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  7. RabbitMQ—重复消费、数据丢失和消息顺序性

    原文作者:weixin_49367803 原文地址:https://blog.csdn.net/weixin_49367803/article/details/108480256 一.如何保证消息不被 ...

  8. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  9. RabbitMQ如何保证消息的顺序性【重点】

    1.1 保证顺序性的意义 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常. 举例:   比如通过mysql binlog进行两个数 ...

最新文章

  1. zoj 1204 Additive equations
  2. 微软为其虚拟机更新Linux支持
  3. Apache—DBUtils框架开发学习实例
  4. getLocationOnScreen不起作用原因
  5. qq浏览器主页_讨论|360、金山毒霸、浏览器主页劫持
  6. Sendmail在企业网中的应用
  7. 关于DataSet与Strongly typed DataSet几点思考(原创)
  8. jquery的函数介绍和使用
  9. CDISC SDTM AE domain学习笔记 - 1
  10. QQ在线客服聊天功能
  11. 别再用所谓的MD5加密了,大佬们都用它——>MD5盐值加密多方法详解
  12. arcgis分析道路节点中心性degree,closeness,betweenness
  13. PROGRESSIVE GROWING OF GANS FOR IMPROVED QUALITY, STABILITY, AND VARIATION(PGGAN)
  14. 什么是套利型创业者?
  15. 蒸发器,冷凝器面积过大
  16. Magic cloth使用方法
  17. 2021年蓝桥杯第十二届软件赛省赛 C/C++ 大学B组 第二场 A-I
  18. 阿里巴巴是最好的客户关系管理实践者
  19. 如果银行想开挂,RPA机器人舍我其谁?
  20. 从rocketmq到kafka:集群、一致性与重平衡

热门文章

  1. 大数据挖掘企业服务平台
  2. 【Lin-CMS内容管理系统框架 v0.3.6】内置用户管理/权限管理/日志系统等常见功能
  3. 计算机系学不学ps,2017年计算机一级PS基础学习点子:给photoshop初学者的一点忠告...
  4. 使用 Vue.js 制作一个简单的调查问卷平台
  5. 结构力学计算软件_建筑结构力学分析的四大门派,哪个最出众?
  6. 软件设计师备考经验分享
  7. Linux软件安装卸载(yum+rpm)
  8. JDK安装及多版本JDK安装
  9. 13_ZYNQ7020_移植rtl8192.cu无线网卡驱动
  10. sketch格式的交互设计师UI设计师作品集模板