rocket mq 实现分布式事务消息 以及示例代码
分布式事务的来龙去脉
业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:
1、用户A的账户先扣除100元
2、再把用户B的账户加100元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中因为各个服务都是独立的模块,都是远程调用,没法在同一个事务中,所以就会遇到分布式事务问题。
RocketMQ中的处理方案![](/assets/blank.gif)
RocketMQ分布式事务方式:把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到RocketMQ,加钱业务订阅“扣款成功消息”,再对用户B加钱(扣款消息中包含了源账户和目标账户ID,以及钱数)
对于扣款业务来说,需规定是先扣款还是先向MQ发消息
场景一:先扣款后向MQ发消息
先扣款再发送消息,万一发送消息失败了,那用户B就没法加钱
场景二:先向MQ发像消息,后扣款
扣款成功消息发送成功,但用户A扣款失败,可加钱业务订阅到了消息,用户B加了钱
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
为了解决以上问题,RocketMq把消息分为两个阶段:半事务阶段和确认阶段
半事务阶段:
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息
确认阶段(commit/rollback):
该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。
整个流程:
1、A在扣款之前,先发送半事务消息2、发送预备消息成功后,执行本地扣款事务3、扣款成功后,再发送确认消息4、B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:
异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。 异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。
对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。
同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)
如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。当RocketMq事务回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般A与B需要进行手动对账,手动补偿。
分布式事务使用案例
本案例简化整体流程,使用A系统向B系统转100块钱为例进行讲解。
案例中消息发送方是A系统,消费订阅方是B系统。
生产者案例代码:
代码解释如下:
- 启动线程池进行定时的任务回查
- 设置生产者事务回查监听器
- 开启本地事务,同时发送半事务消息
- 如果半事务消息失败,则整个事务回滚。
事务回查案例代码:
- 事务回查是非常有必要的,因为生产者在发送完半事务消息后不能立马确认事务的执行状态,所以事务回查有两个方法,一个是本地事务方法,一个是事务回查方法,目的都是为了确保事务的提交或者回滚。
消费者案例代码:
- 消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。
使用限制
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过Broker的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
rocket mq 实现分布式事务消息 以及示例代码相关推荐
- 分布式事务——消息最终一致性方案
前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...
- Apache RocketMQ 正式开源分布式事务消息
摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...
- 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息
近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...
- 四种基于MQ的分布式事务解决方案
在微服务的时代,分布式事务是绕不开的话题,尽管在大多数场景下,我们并不需要使用分布式事务,但是 不需要使用 不代表 可以不会使用,万一哪天真需要用到了呢?分布式事务是一个比较大的话题,今天我们来看看基 ...
- 微服务seata 1.4.2 分布式事务TCC模式示例
seata TCC模式和AT模式的基础环境是一样的,只是在实现方式上有所区别,而且TCC模式还可以和AT模式混合使用. 关于AT模式示例,可以参考seata 1.4.2 分布式事务AT模式示例. TC ...
- python qq自动发消息软件_Python之qq自动发消息的示例代码
准备:pip install win32gui 可能遇到的麻烦: No module named 'win32gui' 的解决方法(踩坑之旅) 源码: import win32gui import w ...
- 企业微信机器人脚本python_python实现企业微信定时发送文本消息的示例代码
企业微信定时发送文本消息 使用工具:企业微信机器人+python可执行文件+计算机管理中的任务计划程序 第一步:创建群机器人 选择群聊,单击鼠标右键,添加群机器人. 建立群机器人后,右键查看机器人,如 ...
- Java帝国之宫廷内斗2(分布式事务消息队列、事务表)
原文地址:https://mp.weixin.qq.com/s/92SghOorf10dm3pM0DWzIg 1.前情提要 上回说到IO大臣一直被JDBC大臣打压, 为了搞掉JDBC大臣, 他忍辱负重 ...
- Rocket MQ(二)消息详解
1. 消息发送方式 1.1 同步发送 简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式. 由于这种同步发送的方式确保了消息的可靠性, ...
最新文章
- TM、XWT和Wazaabi
- muduo之Connector
- Windows7 Home高级 64 中文版 + TortoiseSVN 64 英文版 + SVN Server 32 英文版安装过程
- SAP Spartacus用户登录的实现
- C++学习之路 | PTA乙级—— 1033 旧键盘打字 (20 分)(精简)
- html滚动字幕如何向下移动,按向下键的同时,菜单选项向下移动,浏览器右边的滚动条也跟着跑怎么办。这个bug怎么改...
- phpcmsV9导航栏目点击跳转始终是localhost首页 bug - 分析篇
- MacBook电池健康程度的检查
- PXE无人值守安装linux后无法启动图形
- SadpTool 海康设备网络搜索工具
- jle汇编_汇编学习之路
- 详解标准方程法(内含公式推导和代码)
- 通信方面工作一些简单的名词解释整理
- 流放者柯南自建服务器 linux,流放者柯南个人服务器搭建教程 怎么搭建个人服务器...
- 有属性的自定义注解,如何获取到post请求中RequestBody中对象的一个属性值?
- java split 双竖线_HIVE 常用函数及实例
- 数据采集有什么难点?
- c语言覆盖文件指定内容
- Rational Rhapsody 检查模式
- java 耦合性_软件工程中的耦合性和解耦合性是什么意思?