1 RokcetMQ是什么?

Apache RocketMQ是一个采用Java语言开发的分布式的消息系统,由阿里巴巴团队开发,与2016年底贡献给Apache,成为了Apache的一个顶级项目。
在阿里内部,RocketMQ 很好地服务了 集 团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转(在 2017 年的双十一当天,整个阿里巴巴集团通过 RocketMQ 流转的线上消息达到了万亿级,峰值 TPS 达到 5600 万),在阿里大中台策略上发挥着举足轻重的作用 。

2 RocketMQ解决分布式事务

1、A系统发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。

2、如果这个消息发送成功了、就接着执行本地事务(executeLocalTransaction),如果成功就告诉MQ发送确认消息,如果失败,就告诉MQ发送回滚消息。

3、如果发送了确认消息、那么B系统会接收到确认消息,然后执行本地事务。

4、上面的第2步, 由于网络原因发送确认or回滚消息失败,但是broker有轮询机制,根据唯一id查询本地事务状态,MQ会自动定时轮询所有prepared消息回调你的接口(checkLocalTransaction),问你,这个消息是不是本地事务处理失败了,所有没有发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。

PS:此方案是不支持事务发起服务进行回滚的(订单支付成功了,即使短信发送没有执行成功,订单系统也不会回滚),但是大部分互联网应用都不会要求事务发起方进行回滚,如果一定要事务发起方进行回滚应该采用2PC、3PC、TCC等强一致性方案来实现分布式事务,比如LCN。

以一个例子说明,为什么大部分场景不用回滚:

我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)

3 RocketMQ执行流程

1. 发送方向 MQ 服务端发送消息。
2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
3. 发送方开始执行本地事务逻辑。
4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

4 案例实践

订单支付-支付成功后发送短信-分布式事务

这里通过一个实例来讲一下RocketMQ实现分布式事务具体编码。

场景: 下单场景,订单服务生成订单,当订单支付成功之后,修改订单状态已支付,并且通过短信通知用户支付成功。

数据库设计:

CREATE TABLE `t_order` (`id` varchar(255) NOT NULL,`name` varchar(255) DEFAULT NULL,`state` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

订单服务service的主要方法:

@Service
public class OrderServiceImpl implements OrderService {@Autowired OrderMapper orderMapper;@Overridepublic void updatePayStatusByOrderId(String orderId) {//判断订单是否已支付if(!checkOrderPaySuccess(orderId)){ExceptionCast.cast(CommonCode.ALREADY_PAID);}//修改订单状态Order order = orderMapper.selectById(orderId);order.setState(1);orderMapper.updateById(order);}@Overridepublic boolean checkOrderPaySuccess(String orderId) {Order order = orderMapper.selectById(orderId);if (order.getState() == 0) {return true;}return false;}
}

业务流程

1.在订单表创建一个状态是未支付的订单

在数据库中直接创建一个未支付的订单

//0表示未支付
INSERT INTO `t_order` VALUES ('1', '订单', '0');

2.用户支付完成,通过MQ通知短信服务发送短信

OrderController:pay 发送订单支付成功的MQ事务消息,这里注意体会,并不是直接调OrderService::updatePayStatusByOrderId 然后发送普通的MQ消息。而是先发送事务消息到MQ,然后MQ回调订单服务的TransactionListener::executeLocalTransaction,在这里完成订单状态的更新,保证发送事务消息和更新订单状态的一致性。

@RestController
@RequestMapping("/order")
public class OrderController implements OrderControllerApi {@AutowiredTransactionProducer transactionProducer;@PostMappingpublic void pay(Order order) throws IOException, MQClientException {transactionProducer.sendOrderPaySucessEvent(order);}
}

3.订单服务端的事务消息监听器

@Component
public class TransactionProducer implements InitializingBean {@AutowiredOrderService orderService;private TransactionMQProducer producer;private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();@Overridepublic void afterPropertiesSet() throws Exception {producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("192.168.25.114:9876");// 设置事务监听器ThreadFactory threadFactory =new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();ThreadPoolExecutor executor =new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);producer.setExecutorService(executor);producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {ObjectMapper objectMapper = new ObjectMapper();LocalTransactionState state = LocalTransactionState.UNKNOW;try {Order order = objectMapper.readValue(msg.getBody(), Order.class);// 修改订单状态orderService.updatePayStatusByOrderId(order.getId());// System.out.println(1/0);state = LocalTransactionState.COMMIT_MESSAGE;} catch (UnsupportedEncodingException e) {e.printStackTrace();state = LocalTransactionState.ROLLBACK_MESSAGE;} catch (IOException e) {e.printStackTrace();state = LocalTransactionState.ROLLBACK_MESSAGE;}return state;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("状态回查 ---> "+ msg.getTransactionId()+ " "+ STATE_MAP.get(msg.getTransactionId()));return STATE_MAP.get(msg.getTransactionId());}});producer.start();}@Transactionalpublic void sendOrderPaySucessEvent(Order order) throws IOException, MQClientException {ObjectMapper objectMapper = new ObjectMapper();// 构造发送的事务 消息Message message =new Message("pay_topic",objectMapper.writeValueAsString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));TransactionSendResult result = producer.sendMessageInTransaction(message, null);System.out.println("发送事务消息:" + result.toString());}
}

4. 短信通知服务

消费者监听队列,如果队列中有支付成功的消息就发送短信。

public class TransactionConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = newDefaultMQPushConsumer("HAOKE_CONSUMER");consumer.setNamesrvAddr("192.168.25.114:9876");// 订阅topic,接收此Topic下的所有消息consumer.subscribe("pay_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {//System.out.println(new String(msg.getBody(), "UTF-8"));ObjectMapper objectMapper = new ObjectMapper();Order order = objectMapper.readValue(msg.getBody(), Order.class);System.out.println("您已成功支付,订单号为"+order.getId());} catch (IOException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

5. 测试

测试结果:返回commit状态时,消费者能够接收到消息,返回rollback状态时,消费者接受不到消息。

支付成功:

支付失败:

修改支付的service使其出现bug。

  @Overridepublic void updatePayStatusByOrderId(String orderId) {//判断订单是否已支付if(!checkOrderPaySuccess(orderId)){ExceptionCast.cast(CommonCode.ALREADY_PAID);}//修改订单状态Order order = orderMapper.selectById(orderId);order.setState(1);System.out.println(1/0);orderMapper.updateById(order);}

不会发送消息,此时我们可以根据消息队列中的消息(存放的订单信息)。将钱退还给客户。

分布式事务(阿里巴巴RocketMQ实现分布式事务)相关推荐

  1. rocketmq整合mysql事务_分布式事务(4)---RocketMQ实现分布式事务项目

    摘要: ,但是returnLocalTransactionState.COMMIT_MESSAG的时候*服务挂了,那么最终Brock还未收到消息的二次确定,还是个半消息,所以当重新启动的时候还是回调这 ...

  2. 我说分布式事务之消息最终一致性事务(二):RocketMQ的实现

    来源:https://0x9.me/A76YN 号外:最近整理了一下以前编写的一系列Spring Boot内容,整了个<Spring Boot基础教程>的PDF,关注我,回复:001,快来 ...

  3. 【RocketMQ】【分布式事务】使用RocketMQ实现分布式事务

    参考地址:https://blog.csdn.net/zyw23zyw23/article/details/79070044 视频地址:https://v.youku.com/v_show/id_XO ...

  4. RocketMQ的分布式事务解决方案

    前言 在系统变的复杂后,分布式.微服务等架构技术,就要考虑到应用在系统中了.尤其数据量大了后,就需要对数据库进行拆分. 如:注册的用户数据,量大了后,就需要考虑分库分表 一旦数据库进行了分拆,那就出现 ...

  5. 分布式事务:RocketMQ实现分布式事务原理

    之前讲过有关分布式事务2PC.3PC.TCC的理论知识,博客地址: 1.分布式事务(1)---2PC和3PC原理 2.分布式事务(2)---TCC原理 这篇讲有关RocketMQ实现分布式事务的理论知 ...

  6. 基于rocketMq实现分布式事务解决方案

    前言 在处理分布式事务的问题上,除了前几篇谈到的可以使用seata,Hmily保证事务的最终一致性之外,使用消息队列也可以达到同样的效果 使用消息中间件解决分布式事务的问题,是在分布式事务框架还没有真 ...

  7. SpringCloud Alibaba 2021微服务实战三十二 集成RocketMQ实现分布式事务

    目录 基于RocketMQ分布式事务 - 完整示例 2.解决方案 2.1.本地消息表方案 2.2.RocketMQ事务消息方案 一.事务消息 二.订单服务 1.事务日志表 2.TransactionM ...

  8. 我说分布式事务之消息最终一致性事务(一):原理及实现

    来源:https://0x9.me/YgaUc 在之前的文章中,我们已经学习总结了分布式事务的两种解决方案. 我说分布式事务之TCC 我说分布式事务之最大努力通知型事务 本文我们将学习到另一种常见的柔 ...

  9. oledb 访问接口sqlncli10返回了消息 没有活动事务_这样理解分布式事务你是不是就会懂了?...

    分布式事务主要解决分布式一致性的问题.说到底就是数据的分布式操作导致仅依靠本地事务无法保证原的性.与单机版的事务不同的是,单机是把多个命令打包成一个统一处理,分布式事务是将多个机器上执行的命令打包成一 ...

最新文章

  1. svm rbf人脸识别 yale_实操课——机器学习之人脸识别
  2. my-large.cnf
  3. Commit failed with error: pathspec 'src/main/java/com/leo/demo/juctest/test.java' did not match any
  4. 将NetConf转成NormalizedNode对象及NormalizedNode对象转回NetConf(序列化与反序列化)
  5. Linux redhat下安装jdk-6u45-linux-x64.bin
  6. 硬件知识:串口通讯的起始、数据、停止位是怎么分配的?
  7. Matlab与C/C++混合编程调用OpenCV
  8. kadane算法_使用KADANE的算法求最大子阵列和
  9. python菜鸟入门_python菜鸟入门知识
  10. 【DevOps】SVN分支操作快速入门
  11. 杭州电子科技大学计算机考研复试分数线,2019杭州电子科技大学考研复试分数线通知...
  12. windows2016服务器优化,Windows server 2016系统基本优化设置
  13. 【老生谈算法】matlab实现图像放大算法——图像放大算法
  14. 如何突破编程学习的瓶颈期
  15. canvas学习(html5)画画
  16. 电弧光保护系统 就选汉光HKHB-608智能电弧光保护系统
  17. #GPA计算(python)
  18. 【C语言】你会用吗?
  19. esp32之点亮一盏灯
  20. 【内推】阿里集团2018届毕业生招聘

热门文章

  1. java感谢地说说_羡慕别人幸福的伤感句子:心存感激地生活吧。我们来自偶然,...
  2. centos安装pngquant及python使用pngquant
  3. 微信小程序云开发实现退款功能
  4. 《爱上Android》作者说!
  5. vue项目打包后本地运行方法
  6. conduct a job interview
  7. mac安装 ffmpeg 没有ffplay
  8. 搭建tftp server服务
  9. 内聚和常见内聚种类及解释
  10. PDF页面参差不齐统一页面大小--Adobe印刷制作功能