RocketMQ如何解决分布式事务
本文来说下RocketMQ如何解决分布式事务
文章目录
- 基本实现思路
- RocketMQ的事务消息状态
- 代码实例
- maven导入
- yaml文件配置
- 核心代码
- 本文小结
基本实现思路
核心思想:事务消息总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。这三个阶段是前后关联的,只有发送Prepared消息成功,才会执行本地事务,本地事务返回的状态是提交,那么就会发送最终的确认消息。如果在结束消息事务时,本地事务状态失败,那么Broker回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是Prepared状态则会向生产者发起一个检查本地事务的请求。
基本流程
- 生产者向我们的Broker(MQ服务器端)发送我们派单消息设置为半消息,该消息不可以被消费者消费。
- 再执行我们的本地的事务,将本地执行事务结果提交或者回滚告诉Broker
- Broker获取本地事务的结果,如果是已提交的话,将该半消息设置为允许被消费者消费,如果本地事务执行失败的情况下,将该半消息直接从Broker中移除
- 如果我们的本地事务没有将结果及时通知给我们的Broker,这时候我们Broker会主动定时(默认60s)查询本地事务结果
- 本地事务结果实际上就是一个回调方法,根据自己业务场景封装本地事务结果
- 事务回查的时间次数等配置在broker里
RocketMQ的事务消息状态
RocketMQ的事务消息分为3种状态,分别是提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
代码实例
maven导入
maven导入
<!-- rocketmq -->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version>
</dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version>
</dependency>
yaml文件配置
yaml文件配置
#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 192.168.3.207:9876#生产者配置producer:#组名group: my-producer-group#超时时间设置长一点,要不然会报错sendDefaultImpl call timeoutsend-message-timeout: 50000
核心代码
实体
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEntity {private Long id;private Integer age;private String name;
}
生产者消息发送,这里为了便于测试,写在了controller里面
@Slf4j
@RestController
@RequestMapping("/api")
@Api(tags = "RocketMq事务消息")
public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/**** 使用RocketMQTemplate发送事务消息和普通消息略有不同的是,* 需要指一个事务生产者组,当然如果传入null,则会使用默认值* rocketmq_transaction_default_global_name,* 发生消息的地址和普通消息一样都Topic:Tag,* 另外一点不同的是除了发生的Message之外,* 还可以发送其他的额外参数,不过这些参数* 只会在执行本地事务的时候会用到。** @return*/@GetMapping("/sendOrder")@ApiOperation(value = "发送消息")public String sendOrder() {//数据封装String orderId = (new Random().nextInt(1000)) +"";OrderEntity entity = new OrderEntity();entity.setId(Long.valueOf(orderId));entity.setAge(12);entity.setName("张三");//使用Gson将实体转化成字符串GsonBuilder gsonBuilder = new GsonBuilder();gsonBuilder.setPrettyPrinting();Gson gson = gsonBuilder.create();String mm = gson.toJson(entity);//消息封装MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(mm);stringMessageBuilder.setHeader("msg",mm);Message msg = stringMessageBuilder.build();log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<","my-producer-group","order_topic",entity);// 发送半消息,sendMessageInTransaction方法,以前的版本是4个参数,最新的版本中只有3个参数了TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("order_topic",msg,entity.getId().toString());// 发送半消息的返回结果String sendStatus = sendResult.getSendStatus().name();String localTXState = sendResult.getLocalTransactionState().name();log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);return "success";}
}
生产者监听程序
/**** 除了消费者之外,我们还需要创建事务消息生产者端的消息监听器,注意是生产者,* 不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口。* 重写执行本地事务的方法和检查本地事务方法**/
@Slf4j
@RocketMQTransactionListener
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {private static final Gson GSON = new Gson();/**** executeLocalTransaction方法是用来执行本地事务的,* 将本地事务执行的状态告知Broker服务器的,并不能用来传消息,* 如果想传数据库的主键id,可以提前生成主键id,而不要数据库自动生成。* @param msg* @param arg* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);// 执行本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);log.info(orderEntity.toString());String id = (String) arg;log.info(id);} catch (Exception e) {log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.UNKNOWN;}return result;}/**** checkLocalTransaction方* 法是用来回查本地事务状态的* @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());// 检查本地事务RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;try {String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);log.info(orderEntity.toString());} catch (Exception e) {// 异常就回滚log.error(">>>> exception message={} <<<<",e.getMessage());result = RocketMQLocalTransactionState.ROLLBACK;}return result;}
}
消费者消费
/**** 和一般的消* 息消费类似*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("===============消费者进行消息的消费==============");log.info(">>>> message={} <<<<",message);}
}
程序测试
在执行本地事务方法中正常情况下返回的值是COMMIT,即提交事务,这种情况下消费者会直接消费消息,而略过检查本地事务的方法。
本文小结
本文编写了一个实例来实现RocketMQ如何解决分布式事务,在解决分布式事务中还是比较常见的一种解决方案。
RocketMQ如何解决分布式事务相关推荐
- 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...
- 分布式事务 - 如何解决分布式事务问题?
分布式事物 - 如何解决分布式事务问题? 面试题 分布式事务了解吗?你们是如何解决分布式事务问题的? 面试官心理分析 只要聊到你做了分布式系统,必问分布式事务,你对分布式事务一无所知的话,确实会很坑, ...
- rabbitmq 查询版本_基于rabbitmq解决分布式事务
分布式事务要解决的问题是保证二个数据库数据的一致性,本地事务ACID属于刚性事务,基于CAP理论,分布式事务的核心要点柔性事务,最终一致性. 基于rabbitmq解决分布式事务要点如下 生产者采用发送 ...
- 基于消息中间件解决分布式事务的开源框架Myth
基于消息中间件的解决分布式事务框架:https://github.com/yu199195/myth 1.rpc框架支持 : dubbo,motan,springcloud. 2.消息中间件支持 : ...
- 解决分布式事务,Seata真香
目录 背景介绍 什么是分布式事务 什么叫做逆向补偿呢 互联网最流行的分布式事务组件seata 总结 背景 大家好,今天给大家分享一个在 2022 年出去面试 Java 几乎必问的一个技术,那就是 se ...
- 微服务如何解决分布式事务
在系统变的复杂后,分布式.微服务等架构技术,就要考虑到应用在系统中了.尤其数据量大了后,就需要对数据库进行拆分. 如:注册的用户数据,量大了后,就需要考虑分库分表 一旦数据库进行了分拆,那就出现很多头 ...
- 基于数据库的事务消息解决分布式事务方案
转载请注明出处:http://www.cnblogs.com/lizo/p/8516502.html 概述 当单库已不能支撑当前业务的时候,我们往往都考虑进行分库(横向拆分或者纵向拆分).但分库有个无 ...
- Apache RocketMQ 正式开源分布式事务消息
摘要: 近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事 ...
- 消息中间件学习总结(15)——Apache RocketMQ 正式开源分布式事务消息
近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息, ...
最新文章
- 小学一年级计算机社团计划,一年级社团活动计划.doc
- python基础语法手册format-Python format 格式化函数
- 【C++】vs无法更新DoDataExchange方法问题解决
- 抄袭事件果然是机器人程序所为
- vscode 执行npm命令_生产力终极指南:用了两年,如今才算真正会用VS Code
- java 循环笔记_Java笔记之嵌套循环1
- 各大排序算法的Objective-C实现以及图形化演示比较
- IK Analyzer 和 lucene结合使用
- android 管理fragment,在 Fragment 之间传递数据
- 动态表单 mysql_动态表单实现思路
- 软件测试有效性指标,软件测试用例评审有效性的44个衡量标准[1]
- java论坛 基于SSM框架的游戏论坛 java游戏贴吧 java游戏论坛 java论坛 ssm论坛 ssm贴吧 可以改为各种论坛,分类可在后台自己控制,图片可任意换
- 全球云服务商排名情况及国内云主机市场占有率份额排名对比
- 介绍一款通过软件设置调节显示器亮度的工具:护眼宝
- python画图函数
- cnpm和npm使用,遇到的问题及解决方法
- layUI自定义列表每页条数
- 微众银行大数据爽约? 回应:这是一种误解
- 第 04 课 用户管理
- QNX APS自适应分区调度