本文来说下RocketMQ如何解决分布式事务

文章目录

  • 基本实现思路
  • RocketMQ的事务消息状态
  • 代码实例
    • maven导入
    • yaml文件配置
    • 核心代码
  • 本文小结

基本实现思路

核心思想:事务消息总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。这三个阶段是前后关联的,只有发送Prepared消息成功,才会执行本地事务,本地事务返回的状态是提交,那么就会发送最终的确认消息。如果在结束消息事务时,本地事务状态失败,那么Broker回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是Prepared状态则会向生产者发起一个检查本地事务的请求。

基本流程

  1. 生产者向我们的Broker(MQ服务器端)发送我们派单消息设置为半消息,该消息不可以被消费者消费。
  2. 再执行我们的本地的事务,将本地执行事务结果提交或者回滚告诉Broker
  3. Broker获取本地事务的结果,如果是已提交的话,将该半消息设置为允许被消费者消费,如果本地事务执行失败的情况下,将该半消息直接从Broker中移除
  4. 如果我们的本地事务没有将结果及时通知给我们的Broker,这时候我们Broker会主动定时(默认60s)查询本地事务结果
  5. 本地事务结果实际上就是一个回调方法,根据自己业务场景封装本地事务结果
  6. 事务回查的时间次数等配置在broker里

RocketMQ的事务消息状态

RocketMQ的事务消息分为3种状态,分别是提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

代码实例

maven导入

maven导入

<!-- rocketmq -->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version>
</dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version>
</dependency>

yaml文件配置

yaml文件配置

#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 192.168.3.207:9876#生产者配置producer:#组名group: my-producer-group#超时时间设置长一点,要不然会报错sendDefaultImpl call timeoutsend-message-timeout: 50000

核心代码

实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEntity {private  Long  id;private  Integer age;private  String  name;
}

生产者消息发送,这里为了便于测试,写在了controller里面

@Slf4j
@RestController
@RequestMapping("/api")
@Api(tags = "RocketMq事务消息")
public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/**** 使用RocketMQTemplate发送事务消息和普通消息略有不同的是,* 需要指一个事务生产者组,当然如果传入null,则会使用默认值* rocketmq_transaction_default_global_name,* 发生消息的地址和普通消息一样都Topic:Tag,* 另外一点不同的是除了发生的Message之外,* 还可以发送其他的额外参数,不过这些参数* 只会在执行本地事务的时候会用到。** @return*/@GetMapping("/sendOrder")@ApiOperation(value = "发送消息")public String sendOrder() {//数据封装String orderId = (new Random().nextInt(1000)) +"";OrderEntity entity = new OrderEntity();entity.setId(Long.valueOf(orderId));entity.setAge(12);entity.setName("张三");//使用Gson将实体转化成字符串GsonBuilder gsonBuilder = new GsonBuilder();gsonBuilder.setPrettyPrinting();Gson gson = gsonBuilder.create();String mm = gson.toJson(entity);//消息封装MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(mm);stringMessageBuilder.setHeader("msg",mm);Message msg = stringMessageBuilder.build();log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<","my-producer-group","order_topic",entity);// 发送半消息,sendMessageInTransaction方法,以前的版本是4个参数,最新的版本中只有3个参数了TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("order_topic",msg,entity.getId().toString());// 发送半消息的返回结果String sendStatus = sendResult.getSendStatus().name();String localTXState = sendResult.getLocalTransactionState().name();log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);return "success";}
}

生产者监听程序

/**** 除了消费者之外,我们还需要创建事务消息生产者端的消息监听器,注意是生产者,* 不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口。* 重写执行本地事务的方法和检查本地事务方法**/
@Slf4j
@RocketMQTransactionListener
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {private static final Gson GSON = new Gson();/**** executeLocalTransaction方法是用来执行本地事务的,* 将本地事务执行的状态告知Broker服务器的,并不能用来传消息,* 如果想传数据库的主键id,可以提前生成主键id,而不要数据库自动生成。* @param msg* @param arg* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);// 执行本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);log.info(orderEntity.toString());String id = (String) arg;log.info(id);} catch (Exception e) {log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.UNKNOWN;}return result;}/**** checkLocalTransaction方* 法是用来回查本地事务状态的* @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());// 检查本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);log.info(orderEntity.toString());} catch (Exception e) {// 异常就回滚log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.ROLLBACK;}return result;}
}

消费者消费

/**** 和一般的消* 息消费类似*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("===============消费者进行消息的消费==============");log.info(">>>> message={} <<<<",message);}
}

程序测试

在执行本地事务方法中正常情况下返回的值是COMMIT,即提交事务,这种情况下消费者会直接消费消息,而略过检查本地事务的方法。


本文小结

本文编写了一个实例来实现RocketMQ如何解决分布式事务,在解决分布式事务中还是比较常见的一种解决方案。

RocketMQ如何解决分布式事务相关推荐

  1. 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

    搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...

  2. 分布式事务 - 如何解决分布式事务问题?

    分布式事物 - 如何解决分布式事务问题? 面试题 分布式事务了解吗?你们是如何解决分布式事务问题的? 面试官心理分析 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑, ...

  3. rabbitmq 查询版本_基于rabbitmq解决分布式事务

    分布式事务要解决的问题是保证二个数据库数据的一致性,本地事务ACID属于刚性事务,基于CAP理论,分布式事务的核心要点柔性事务,最终一致性. 基于rabbitmq解决分布式事务要点如下 生产者采用发送 ...

  4. 基于消息中间件解决分布式事务的开源框架Myth

    基于消息中间件的解决分布式事务框架:https://github.com/yu199195/myth 1.rpc框架支持 : dubbo,motan,springcloud. 2.消息中间件支持 : ...

  5. 解决分布式事务,Seata真香

    目录 背景介绍 什么是分布式事务 什么叫做逆向补偿呢 互联网最流行的分布式事务组件seata 总结 背景 大家好,今天给大家分享一个在 2022 年出去面试 Java 几乎必问的一个技术,那就是 se ...

  6. 微服务如何解决分布式事务

    在系统变的复杂后,分布式.微服务等架构技术,就要考虑到应用在系统中了.尤其数据量大了后,就需要对数据库进行拆分. 如:注册的用户数据,量大了后,就需要考虑分库分表 一旦数据库进行了分拆,那就出现很多头 ...

  7. 基于数据库的事务消息解决分布式事务方案

    转载请注明出处:http://www.cnblogs.com/lizo/p/8516502.html 概述 当单库已不能支撑当前业务的时候,我们往往都考虑进行分库(横向拆分或者纵向拆分).但分库有个无 ...

  8. Apache RocketMQ 正式开源分布式事务消息

    摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...

  9. 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息

    近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...

最新文章

  1. 小学一年级计算机社团计划,一年级社团活动计划.doc
  2. python基础语法手册format-Python format 格式化函数
  3. 【C++】vs无法更新DoDataExchange方法问题解决
  4. 抄袭事件果然是机器人程序所为
  5. vscode 执行npm命令_生产力终极指南:用了两年,如今才算真正会用VS Code
  6. java 循环笔记_Java笔记之嵌套循环1
  7. 各大排序算法的Objective-C实现以及图形化演示比较
  8. IK Analyzer 和 lucene结合使用
  9. android 管理fragment,在 Fragment 之间传递数据
  10. 动态表单 mysql_动态表单实现思路
  11. 软件测试有效性指标,软件测试用例评审有效性的44个衡量标准[1]
  12. java论坛 基于SSM框架的游戏论坛 java游戏贴吧 java游戏论坛 java论坛 ssm论坛 ssm贴吧 可以改为各种论坛,分类可在后台自己控制,图片可任意换
  13. 全球云服务商排名情况及国内云主机市场占有率份额排名对比
  14. 介绍一款通过软件设置调节显示器亮度的工具:护眼宝
  15. python画图函数
  16. cnpm和npm使用,遇到的问题及解决方法
  17. layUI自定义列表每页条数
  18. 微众银行大数据爽约? 回应:这是一种误解
  19. 第 04 课 用户管理
  20. QNX APS自适应分区调度

热门文章

  1. 《Adobe Flash Professional CC经典教程》——1.15 复习
  2. 《HTML5与CSS3实战指南》——第2章 HTML5样式的标记2.1 The HTML5 Herald简介
  3. kali实战-被动信息收集
  4. 项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享
  5. 辛星一起了解下后续PHP性能功能
  6. chrome插件:提取页面数据
  7. MongoDB 副本集的相关概念【转】
  8. 四种launchMode启动方式
  9. oracle 审计(二)
  10. 关于this和base