分布式事务的来龙去脉

业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:
1、用户A的账户先扣除100元 
2、再把用户B的账户加100元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中因为各个服务都是独立的模块,都是远程调用,没法在同一个事务中,所以就会遇到分布式事务问题。

RocketMQ中的处理方案

RocketMQ分布式事务方式:把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到RocketMQ,加钱业务订阅“扣款成功消息”,再对用户B加钱(扣款消息中包含了源账户和目标账户ID,以及钱数)
对于扣款业务来说,需规定是先扣款还是先向MQ发消息
场景一:先扣款后向MQ发消息
先扣款再发送消息,万一发送消息失败了,那用户B就没法加钱
场景二:先向MQ发像消息,后扣款
扣款成功消息发送成功,但用户A扣款失败,可加钱业务订阅到了消息,用户B加了钱
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
为了解决以上问题,RocketMq把消息分为两个阶段:半事务阶段和确认阶段
半事务阶段:
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息
确认阶段(commit/rollback:
该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。
整个流程:
1、A在扣款之前,先发送半事务消息2、发送预备消息成功后,执行本地扣款事务3、扣款成功后,再发送确认消息4、B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:
异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。 异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。

对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。

这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。

同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。

除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)

如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。当RocketMq事务回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般A与B需要进行手动对账,手动补偿。

分布式事务使用案例

本案例简化整体流程,使用A系统向B系统转100块钱为例进行讲解。

案例中消息发送方是A系统,消费订阅方是B系统。

生产者案例代码:

代码解释如下:

  • 启动线程池进行定时的任务回查
  • 设置生产者事务回查监听器
  • 开启本地事务,同时发送半事务消息

  • 如果半事务消息失败,则整个事务回滚。

事务回查案例代码:

  • 事务回查是非常有必要的,因为生产者在发送完半事务消息后不能立马确认事务的执行状态,所以事务回查有两个方法,一个是本地事务方法,一个是事务回查方法,目的都是为了确保事务的提交或者回滚。

消费者案例代码:

  • 消费者使用的尽最大可能性确保成功消费(重试机制+死信队列特殊处理),所以B系统的处理比较简单,开启事务确保消费成功即可。

使用限制

  • 事务消息不支持延时消息和批量消息。
  • 事务回查的间隔时间:BrokerConfig. transactionCheckInterval  通过Broker的配置文件设置好。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
  • 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  • 事务性消息可能不止一次被检查或消费。
  • 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

rocket mq 实现分布式事务消息 以及示例代码相关推荐

  1. 分布式事务——消息最终一致性方案

    前言 分布式事务一直是服务化拆分后一个绕不开的话题,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务 ...

  2. Apache RocketMQ 正式开源分布式事务消息

    摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...

  3. 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息

    近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...

  4. 四种基于MQ的分布式事务解决方案

    在微服务的时代,分布式事务是绕不开的话题,尽管在大多数场景下,我们并不需要使用分布式事务,但是 不需要使用 不代表 可以不会使用,万一哪天真需要用到了呢?分布式事务是一个比较大的话题,今天我们来看看基 ...

  5. 微服务seata 1.4.2 分布式事务TCC模式示例

    seata TCC模式和AT模式的基础环境是一样的,只是在实现方式上有所区别,而且TCC模式还可以和AT模式混合使用. 关于AT模式示例,可以参考seata 1.4.2 分布式事务AT模式示例. TC ...

  6. python qq自动发消息软件_Python之qq自动发消息的示例代码

    准备:pip install win32gui 可能遇到的麻烦: No module named 'win32gui' 的解决方法(踩坑之旅) 源码: import win32gui import w ...

  7. 企业微信机器人脚本python_python实现企业微信定时发送文本消息的示例代码

    企业微信定时发送文本消息 使用工具:企业微信机器人+python可执行文件+计算机管理中的任务计划程序 第一步:创建群机器人 选择群聊,单击鼠标右键,添加群机器人. 建立群机器人后,右键查看机器人,如 ...

  8. Java帝国之宫廷内斗2(分布式事务消息队列、事务表)

    原文地址:https://mp.weixin.qq.com/s/92SghOorf10dm3pM0DWzIg 1.前情提要 上回说到IO大臣一直被JDBC大臣打压, 为了搞掉JDBC大臣, 他忍辱负重 ...

  9. Rocket MQ(二)消息详解

    1. 消息发送方式 1.1 同步发送 简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式. 由于这种同步发送的方式确保了消息的可靠性, ...

最新文章

  1. TM、XWT和Wazaabi
  2. muduo之Connector
  3. Windows7 Home高级 64 中文版 + TortoiseSVN 64 英文版 + SVN Server 32 英文版安装过程
  4. SAP Spartacus用户登录的实现
  5. C++学习之路 | PTA乙级—— 1033 旧键盘打字 (20 分)(精简)
  6. html滚动字幕如何向下移动,按向下键的同时,菜单选项向下移动,浏览器右边的滚动条也跟着跑怎么办。这个bug怎么改...
  7. phpcmsV9导航栏目点击跳转始终是localhost首页 bug - 分析篇
  8. MacBook电池健康程度的检查
  9. PXE无人值守安装linux后无法启动图形
  10. SadpTool 海康设备网络搜索工具
  11. jle汇编_汇编学习之路
  12. 详解标准方程法(内含公式推导和代码)
  13. 通信方面工作一些简单的名词解释整理
  14. 流放者柯南自建服务器 linux,流放者柯南个人服务器搭建教程 怎么搭建个人服务器...
  15. 有属性的自定义注解,如何获取到post请求中RequestBody中对象的一个属性值?
  16. java split 双竖线_HIVE 常用函数及实例
  17. 数据采集有什么难点?
  18. c语言覆盖文件指定内容
  19. Rational Rhapsody 检查模式
  20. java 耦合性_软件工程中的耦合性和解耦合性是什么意思?

热门文章

  1. 微信小程序(小程序定位获取地址信息篇)
  2. 退不了款的ofo,不能了的资本局 | 一点财经
  3. CRM下午茶(八)-客户价值评估
  4. 实体完整性和参照完整性(数据库笔记)
  5. 计算等额本金和等额本息的还款详情的软件
  6. Filament 渲染引擎剖析 之 ECS模式
  7. 【python代码实现带数据柱状堆积图】
  8. LVGL学习——动画 时间线
  9. WhatsAppBusiness官方说明导读及调研分析
  10. P29 JTextArea文本域