• 背景

最近项目开发到比较关键的阶段,在消息中间件上出现了一些问题,因为是电商项目,我们使用了springboot搭建了两个关键的微服务——会员系统(member system)和订单系统(order system),下面分别简称msos
其中,os负责产生交易订单orderIndfos,orderInfos里面的每一条数据orderInfo都绑定了一个会员。因为订单是会员产生的,这些数据需要投递给会员模块ms去计算成长值和积分,这个计算过程较为复杂,所哟普不能跟订单同步完成,所以只能通过消息中间件来传递,会员系统通过订阅这些数据进行成长值和积分的计算。消息中间件是RocketMQ

  • 问题发现

ms和os通过RocketMQ进行通信的期间,发生了一个意料之中的问题。——消息重复消费问题。
有过类似经验的同学应该对这类问题很熟悉,而没有经历过的同学,或许会感到疑惑,RocketMQ已经设置为单个的订阅模式(并非广播,按理说生产者不可能重复产生相同的消息),为什么消费者还会重复消费到相同的信息呢?
起初我也很疑惑,去查看平台记录,发现生产者的消息入所想的一样,只有一个,这说明问题跟生产者无关。而发生重复消费的原因,竟然是这个消息被重复订阅了好几次!换言之,同一个消息被重复投递了几次!

  • 查询原因

查看资料,得到了这样的解释:
一般来说,消息系统,对于未确认的消息,采用按规则重新投递的方式进行处理。如下情况可能会导致消息重复被投递:

1、订阅方应用接收到消息,业务处理完成后应用出中间件不知道消息处理结果,会重新投递消息。
2、订阅方应用接收到消息,业务处理完成后网络出中间件收不到消息处理结果,会重新投递消息。
3、订阅方应用接收到消息,业务处理时间过长,消息中间件因消息超时未确认,会再次投递消息。
4、订阅方应用接收到消息,业务处理完成,消息中间件问题导致收不到消息处理结果,消息会重新投递。
5、订阅方应用接收到消息,业务处理完成,消息中间件收到了消息处理结果,但由于消息存储故障导致消息没能成功确认,消息会再次投递。

  • 简要分析

这说明,这个问题的出现是十分复杂的。
上述的情况可能会出现在任何项目里面,而一般来说,情况2出现的可能性比较大,因为生产环境一旦发包调试,都是众多微服务一起启动,注册和发现过程中,环境不太稳定,很容易出现网络障碍,在这种情况下,RocketMQ中间件会收不到消费方的回复,从而重新投递消息。这属于mq本身应答异常状况的机制。

  • 解决方案
    既然找到了问题出现的原因,那我们就可以根据项目具体情况给出解决方案。

方案一: 分布式锁
我们知道,在生产环境,一般都是多元jvm集群环境,mq系统投递重复消息时,很有可能导致数据库重复消费(有人说可以采用synchronize或者Lock的方式锁住,但实际上行不通,因为同步锁只能保证多线程问题,注意,集群是多进程问题,不能用此方法解决)。就好比新增操作,一个消息新增一条数据,那么重复消息就会导致数据库新增两条重复数据,这明显是不符合业务场景的。那么,我们平时解决多进程问题最常用的方式就是分布式锁(一般是zookeeper和redis搭建)。分布式锁特有的集群全局视野,能够辅助,消息重复的甄别。
如下:

方案二:数据库约束+java异常处理机制

这个方案很简单,首先需要针对数据库简历约束,不允许产生重复数据,然后再使用java的异常处理机制来规避重复消息。如下:

public class TestClient {public static void main(String[] args) {// 生成消息工具类Student studentMessage = getStudent();try {usermapper.insertStudentid(studentMessage);}catch(Exception e) {// 异常处理if(e.getCause() instanceof MySQLIntegrityConstraintViolationException)log.warn("学号{}已存在,重新生成",studentMessage.getStudentid);//发现异常,也就说明消息重复了,业务中断,返回,直接丢掉消息return ;}//继续消费//…………其他后续业务;}}

上述的异常处理是重点,请务必要有这个“e.getCause() instanceof MySQLIntegrityConstraintViolationException”条件,因为这个只处理相同消息的异常,仅仅是遇到这种异常,才会return MESSAGE.FAILURE;,否则还是要抛出。

这样,就完美解决了重复消息问题。当然很多人说这样做不优雅,显得很土。于是我们采用了第三种方案。

方案三:利用数据库有条件的插入语句限制重复插入

这个方案是最简单的方案,我们都知道mysql的语句,无论是insert还是update,都是支持有条件的执行的,update我就不多说了,就说一下insert。

insert into person_table (uid,pname,age)
select 2,"李四",20 from dual
where not exists
(select id from person_table where uid=2 )

上述语句中“where not exists”后面跟随的就是本条语句的插入条件。这样一来,问题就好办了。


如上图所示,此方案利用了数据库特性替代了锁。既方便,又可靠。

方案四:查询消息系统验证消息是否重复

方案二三都属于较为简单的解决方案,上述方案不能适用于所有场景。所以大部分大厂(如阿里,腾讯等等)是不会采用这种头痛医头脚痛医脚的方案的。既然问题是处在mq中,那么解铃还须系铃人。我们也可以直接请针对mq出方案。
一般来说,mq在发完消息,甚至消费者再订阅消息时,都会有记录。我们可以在消费时,通过订阅的记录和消费的结果来判断,此消息是否重复订阅过,倘若重复订阅,则不再数据库中插入数据。
此方案,关键在于,消费这个行为发生之前,必须去查询mq的订阅记录,验证此消息是否曾经发出来过,情况较为复杂,而且对后续人工发消息补偿时,也要纳入考虑,所有单单解决消息重复这一点来说,显得有些成本过高,所以我们没有采用。在这里也不做详细讨论。

当然,除了上面几种方案,必然还存在别的方案,如乐观锁,hash锁等等。欢迎大家一起讨论。
本文完。

rocketMq消息重复消费问题相关推荐

  1. RocketMQ消息重复消费场景及解决办法

    消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等. 那么在什么情况下会发生RocketMQ的消息重复消费呢? 当系统的调用链路比较长的 ...

  2. RocketMQ常见问题-消息重复消费和消息重复的问题

    RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,RocketMQ不解决消息重复问题,重要的事情说三遍. 基本上说我很讨厌有人问这个问题,问这个问题首先你对消息的生命周期缺乏理解 ...

  3. 面试官:给我一个避免消息重复消费的解决方案?

    欢迎关注方志朋的博客,回复"666"获面试宝典 消息中间件是分布式系统常用的组件,无论是异步化.解耦.削峰等都有广泛的应用价值. 我们通常会认为,消息中间件是一个可靠的组件--这里 ...

  4. RabbitMQ消息重复消费问题

    业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题. 比如,用户到银行取钱后会收到扣 ...

  5. Spring Cloud Stream如何处理消息重复消费

    最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题.通过沟通与排查下来主要还是用户对消费组的认识不够.其实,在之前的博文 ...

  6. kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

    写在开头: 本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点. 文章内容输出来源:拉勾教育大数据高薪训练营. 一致性 ...

  7. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性

    [重难点][RabbitMQ 02]如何避免消息重复投递和消息重复消费.如何防止消息丢失.如何保证消息的顺序性.如何保证消息队列的可用性 文章目录 [重难点][RabbitMQ 02]如何避免消息重复 ...

  8. RocketMQ如何保证消息顺序消费?又为何不解决消息重复消费问题?

    消息的顺序消费对于业务系统来说非常重要,一笔订单产生了3条消息,分别是订单创建.订单付款.订单完成.消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的. 如何保证消息顺序消费? ...

  9. RocketMQ 消息队列中丢失消息的场景举例及解决办法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图 ...

  10. 面试官问:消息被重复消费,怎么避免?有什么好的解决方案?

    欢迎关注方志朋的博客,回复"666"获面试宝典 消息中间件是分布式系统常用的组件,无论是异步化.解耦.削峰等都有广泛的应用价值. 我们通常会认为,消息中间件是一个可靠的组件--这里 ...

最新文章

  1. Centos 6.5 python 2.6.6 升级到 2.7
  2. python创建django项目语句_简单了解Django项目应用创建过程
  3. linux 下 upx 脱壳笔记
  4. 三种方式实现分布式锁
  5. 如何快速找到settype被assign的product category
  6. centos修改磁盘uuid_Centos更换损坏硬盘UUID改变导致系统不能正常启动处理
  7. Java对象类型转换
  8. QT 015 【数据库】 QSqlTableModel Class
  9. 【重难点】【分布式 01】RESTful、RPC 对比、Dubbo、Spring Cloud 对比、Eureka、Zookeeper、Consul、Nacos 对比、分布式锁
  10. Java多线程_复习(更新中!!)
  11. 【C++笔记】对象模型和this指针
  12. 下载goldfish源码
  13. 【180929】仿微信飞机大战游戏源码
  14. word文档中删除空行(段落空行与缩进空行)
  15. 电商平台-财务系统模块的设计与架构
  16. system verilog编程题_SystemVerilog通用程序库(下)
  17. IE浏览器下载文件中文文件名乱码问题解决
  18. win10小课堂:微信电脑端多开方法
  19. 写一个获取非行间样式的函数
  20. 如何成为早起者(三)

热门文章

  1. 用Java+xml配置方式实现Spring数据事务(编程式事务)
  2. Elasticserch学习之分页
  3. 服务器响应头隐藏X-power-by
  4. K-Means算法的Java实现
  5. Tensorflow-slim 学习笔记(一)概述
  6. FastReport.Net使用:[1]屏蔽打印对话框
  7. groupadd命令详解(实例)
  8. iPhone5s 等 64位真机 运行 带有百度地图等 仅支持32位系统API和SDK的问题
  9. 清空sql server数据库日志
  10. [转载]用户(User)和用户组(Grou…