目录

1、RocketMQ整体介绍

2、核心概念模型

3、RocketMQ-源码包下载与结构说明

4、RocketMQ-环境搭建(搭建一个实例)

4.1、Hosts添加信息

4.2、上传解压

4.3、创建存储路径

4.4、编辑RocketMQ配置文件

4.5、修改日志配置文件

4.6、修改启动脚本参数(默认分配的内存很大,放小一点)

4.7、启动 NameServer

4.8、启动 BrokerServer

4.9、查看是否启动成功

4.10、数据清理

5、RocketMQ控制台部署与使用(messageId查询报错解决方法)

5.1、RocketMQ-Console下载地址

5.2、修改配置文件

5.3、当messageId查询报错的时候解决方案

5.4、修改后的RocketMQ-Console源码百度网盘地址

5.4、打包后,启动。

二、RocketMQ-急速入门

1、生产者模型使用

2、消费者模型使用

3、四种集群环境

3.1、图例​编辑

3.2、四种模式概览

4、主从集群环境构建(双主双从)

三、RocketMQ-生产者核心

1、配置参数解析

2、主从同步机制

3、同步/异步消息发送解析

4、延迟消息(Level可修改)

5、消息的返回状态

6、自定义消息发送规则(如何把消息发送到指定的队列)

四、RocketMQ-消费者核心

1、配置参数详解

2、集群与广播模式

2.1、集群模式

2.2、广播模式

3、消息存储核心-OffSet

4、PullConsumer

五、分布式事务消息讲解

5.1、分布式事务流程

5.2、场景模拟

5.3、实现分布式事务

5.3.1、建表

5.3.2、编写生产者

5.3.3、新建本地事务监听器

5.3.4、编写消费者

5.3.5、测试

六、SpringBoot整合RocketMQ

6.1、引入依赖

6.2、编写配置文件

6.3、编写生产者

6.4、编写消费者

6.5、测试


一、RocketMQ-初探门径

1、RocketMQ整体介绍

RocketMQ是一款分布式、队列模型的消息中间件。支持集群模型、负载均衡、水平扩展能力,亿级别的消息堆积能力,采用零拷贝的原理、顺序写盘、随机读。丰富的API使用,代码优秀,底层通信框架使用Netty NIO框架,NameServer代替Zookeper。强调集群无单点,可扩展,任意一点高可用,水平可扩展。消息失败重试机制,消息可查询。开源社区活跃、成熟度高。(经过双十一的考验)

官网地址:GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications. - GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.https://github.com/apache/rocketmq

2、核心概念模型

Producer:消息生产者,负责产生消息,一般有业务喜用负责产生消息。

Consumer:消息消费者,负责消费消息,一般都是后台系统负责异步消费。

Push Consumer:Consumer的一种,需要向Consumer对象注册监听。

Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。

Producer Group:生产者集合,一般用于发送一类消息。

Consumer Group:消费者集合,一般用于接受一类消息进行消费。

Broker:MQ消息服务。(中转角色,用于消息存储与生产消费转发)

3、RocketMQ-源码包下载与结构说明

下载地址:Releases · apache/rocketmq · GitHub

RocketMQ的源码包结构

rocketmq-broker 主要的业务逻辑,消息收发,主从同步,pagecache

rocketmq-client 客户端接口,比如生产者和消费者。

rocketmq- example 示例,比如生产者和消费者。

rocketmq- common 公用数据结构等等。

rocketmq- distribution 编译模块,编译输出等。

rocketmq- filter 进行Broker过滤的不感兴趣的消息传输,减小带宽压力。

rocketmq- logappender、rocketmq-logging日志相关。

rocketmq-namesrv Namesrv服务,用于服务协调。

rocketmq-openmessaging 对外提供服务。

rockermq-remoting 远程调用接口,封装Netty底层通信。

rocketmq-srvutil 提供一些公用的工具方法,比如解析命令行参数。

rocketmq-srvutil 消息存储。

rocketmq-test 测试模块

rocketmq-tools 管理工具。

4、RocketMQ-环境搭建(搭建一个实例)

4.1、Hosts添加信息

vi /etc/hosts

在hosts文件底部编写(双主双从的话,两台机器配置文件一致,主节点、从节点)

电脑ip rocketmq-nameserver1
电脑ip rocketmq-master1

4.2、上传解压

在/usr/local 文件夹下创建文件夹apache-rocketmq,并将rockermq文件放入该文件夹。

mkdir /usr/local/apache-rocketmq

建立软连接

ln -s apache-rocketmq rocketmq

4.3、创建存储路径

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index

4.4、编辑RocketMQ配置文件

vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master #- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#多网卡情况下需要配置brokerIP,否则broker启动后IP异常。两主两丛配置两个,其他的依次类推
brokerIP1=电脑IP
brokerIP2=电脑IP

4.5、修改日志配置文件

mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

4.6、修改启动脚本参数(默认分配的内存很大,放小一点)

# vim /usr/local/rocketmq/bin/runbroker.sh

修改JAVA_OPT启动参数:

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
# vim /usr/local/rocketmq/bin/runserver.sh

修改JAVA_OPT启动参数

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"

4.7、启动 NameServer

# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &

4.8、启动 BrokerServer

cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties  >/dev/null 2>&1 &

4.9、查看是否启动成功

# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

4.10、数据清理

# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv
# --等待停止
# rm -rf /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
# --按照上面步骤重启 NameServer 与 BrokerServer

5、RocketMQ控制台部署与使用(messageId查询报错解决方法)

5.1、RocketMQ-Console下载地址

GitHub - SummerUnfair/rocketmq-externals: Mirror of Apache RocketMQ (Incubating)Mirror of Apache RocketMQ (Incubating). Contribute to SummerUnfair/rocketmq-externals development by creating an account on GitHub.https://github.com/SummerUnfair/rocketmq-externals

5.2、修改配置文件

修改配置文件中的rocketmq.config.namesrvAddr属性,该为自己RocketMQ的地址。其他的按需修改

5.3、当messageId查询报错的时候解决方案

修改源码MessageView类,修改fromMessageExt方法

5.4、修改后的RocketMQ-Console源码百度网盘地址

public static MessageView fromMessageExt(MessageExt messageExt) {MessageView messageView = new MessageView();BeanUtils.copyProperties(messageExt, messageView);if (messageExt.getBody() != null) {messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8));}if (messageExt instanceof MessageClientExt){MessageClientExt ext= (MessageClientExt) messageExt;messageView.setMsgId(ext.getOffsetMsgId());}return messageView;}

5.4、打包后,启动。

访问页面:http://localhost:8080

二、RocketMQ-急速入门

1、生产者模型使用

public class Producer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//创建DefaultMQProducer对象生产消息DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");//指定RocketMQ Service 地址producer.setNamesrvAddr(Const.NAMESRV_ADDR);//启动RocketMQproducer.start();//循环五次发送消息for (int i = 0; i < 5; i++) {/*发送消息参数   topic 消息主题参数   tags  消息标签参数   keys  用户自定义的key,唯一标识参数   body  消息体*/Message testMessage = new Message("test_quick_topic","TagA","Key" + i,("Hello RocketMQ" + i).getBytes());SendResult sendResult = producer.send(testMessage);System.out.println("消息发出:" + sendResult);}producer.shutdown();}
}

2、消费者模型使用

public class Consumer {public static void main(String[] args) throws MQClientException {//创建DefaultMQPushConsumer对象消费消息DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");//指定RocketMQ Service 地址consumer.setNamesrvAddr(Const.NAMESRV_ADDR);//设置消费方式,从最后一条消息消费,枚举可根据业务自行选择consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);/*注册订阅topic 消息主题subExpression 参数支持通配符,值为*的话,消息主题(test_quick_topic)下的都会消费,也可以为具体的tags*/consumer.subscribe("test_quick_topic", "TagA");//    监听topic,消费消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt messageExt = list.get(0);try {String topic = messageExt.getTopic();String tags = messageExt.getTags();String keys = messageExt.getKeys();if (keys.equals("Key1")){System.out.println("消息消费失败");int a=1/0;}String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",msgBody:" + msgBody);} catch (Exception e) {e.printStackTrace();int reconsumeTimes = messageExt.getReconsumeTimes();System.out.println("reconsumeTimes :"+reconsumeTimes);if (reconsumeTimes==3) {//记录消费失败操作日志System.out.println("记录消费失败操作日志");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("consumer start ...");}
}

3、四种集群环境

3.1、图例

3.2、四种模式概览

3.2.1、单点模式

3.2.2、主从模式

3.2.3、双主模式

3.2.4、双主双从模式,多主多从模式。

4、主从集群环境构建(双主双从)

链接: https://pan.baidu.com/s/1qhBgCENdEaBEDvtQtSFXhA?pwd=ibpi 提取码: ibpi 
--来自百度网盘超级会员v4的分享

三、RocketMQ-生产者核心

1、配置参数解析

1.1、productGroup:组名。

1.2、createTopicKey:Topic Key,很少指定。

1.3、defaultTopicQueueNums:默认Topic队列数(默认为4)。

1.4、sendMsgTimeout:发送消息的超时时间(单位:ms)。

1.5、compressMsgBodyOverHowmuch:默认压缩字节4096。

1.6、retryTimesWhenSendFailed:当发送消息超时或者失败的时候,重发机制配置(有同步、异步重发,可配置)。

1.7、retryAnotherBrokerWhenNotStoreOK:没有存储成功,选择别的broker存储(默认false)。

1.8、maxMessageSize:消息体最大限制,默认为128K。

2、主从同步机制

2.1、Master - Slave 主从同步

2.2、同步信息:数据内容(实时同步,socket实现)+元数据信息(定时任务实现同步,Netty)

2.3、元数据同步:Broker角色识别,为Slave则启动同步任务

2.4、消息同步:HAService、HAconnection、WaitNotfiyObject。

3、同步/异步消息发送解析

3.1、消息同步发送:producer.send(msg)

3.2、同步发送消息核心实现:DefaultMQProducerImpl

3.3、消息异步发送:producer.send(Message msg,SendCallback sendCallback)

3.4、异步发送消息核心实现:DefaultMQProducerImpl

//异步发送消息producer.send(testMessage, new SendCallback() {//消息发送成功会掉@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("msgId:" + sendResult.getMsgId() + ",status:" + sendResult.getSendStatus() + "消息发送成功");}//消息发送失败会掉@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});

4、延迟消息(Level可修改)

延迟消息:消息发到Broker后,要特定的时间才会被Consumer消费。

只支持固定精度的定时消息

MessageStoreConfig配置类&ScheduleMessageService任务类

setDelayTimeLevel方法设置

 Message testMessage = new Message("test_quick_topic","TagA","Key" + i,("Hello RocketMQ" + i).getBytes());if (i==1) {testMessage.setDelayTimeLevel(2);}//同步发送消息SendResult sendResult = producer.send(testMessage);System.out.println("消息发出:" + sendResult);

5、消息的返回状态

5.1、SEND_OK:消息发送成功。

5.2、FLUSH_DISK_TIMEOUT:消息发送成功,但服务器刷盘的时候超时了

5.3、FLUSH_SLAVE_TIMEOUT:消息发送成功,从节点同步过程中超时了。

5.4、SLAVE_NOT_AVAILABLE:从节点不可用。

6、自定义消息发送规则(如何把消息发送到指定的队列)

producer.send(Msg,selector,Obj);

//指定队列发送消息(默认是的四个队列)SendResult send = producer.send(testMessage, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {Integer queueNumber = (Integer) o;return list.get(queueNumber);}}, 2);System.out.println(send);

四、RocketMQ-消费者核心

1、配置参数详解

1.1、consumeFromWhere:指定启动后broker开始消费位置

1.2、allocateMessageQueueStrategy:消息分配的策略(集群模式下使用)

1.3、subscription:标识订阅,第一个参数为订阅的主题,第二个参数是实现消息Tags过滤表达式(可以写具体的tags、*(不过滤)、tagsA||tagsB(包含过滤tagsA和tagsB))

1.4、offsetStore:存储实际的偏移量,有本地还远程的方式

1.5、consumeThreadMin/consumeThreadMax:消费者线程池的自动调整配置

1.6、consumeConcurrentlyMaxSpan/pullThresholdForQueue:用来做流控的,第一个方法表示单个队列并行消费的最大跨度,第二个方法表示单个队列最大的消费个数是多少。

1.7、pullinterval/pullBatchSize:用作消息拉取,第一个方法表示消息拉取的时间间隔,第二个方法表示一次拉取的数据是多少

1.8、consumeMessageBatchMaxSize:默认为1,表示一次消息最多可以拉取多少条数据

2、集群与广播模式

2.1、集群模式

clustering模式默认为集群模式,GroupName用于把多个Consumer组织到一起,相同的GroupName的consumer只消费所订阅消息的一部分。目的是达到天然的负载均衡机制。

2.2、广播模式

Broadcasting模式(广播模式),同一个ConsumerGroup里的Consumer都会消费订阅Topic全部信息,也就是一条消息会被每一个Consumer消费,使用setMessageModel方法设置模式

3、消息存储核心-OffSet

Offset是消息消费进度的核心,Offset指某个topic下的一条消息在某个messageQueue里的位置。通过Offset可以进行定位到这条消息,Offset的存储实现分为远程文件类型和本地文件类型两种。集群模式(RemoteBrokerOffsetStore)采用远程文件存储offset,本质上因为多消费模式,每个Consumer消费所订阅主题的一部分。这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。广播模式(LocalFileOffsetStore)由于每个Consumer都会收到消息且消费,各个Consumer之间没有任何干扰,独立线程消费,所以使用LocalFileOffsetStore,也就是把Offset存储到本地。

4、PullConsumer

消息拉取方式:DefaultMQPullConsumer,Pull方式主要做了三件事,获取Message Queue并遍历,维护OffsetStore,根据不同的消息状态做不同的处理。

五、分布式事务消息讲解

5.1、分布式事务流程

  1. 生产者发送半消息到 MQ Server,暂时不能投递,不会被消费
  2. 半消息发送成功后,生产者这边执行本地事务
  3. 生产者根据本地事务执行结果,向 MQ Server 发送 commit 或 rollback 消息进行二次确认
  4. 如果 MQ Server 接收到的 commit,则将半消息标记为可投递状态,此时消费者就能进行消费;如果收到的是 rollback,则将半消息直接丢弃,不会进行消费
  5. 如果 MQ Server 未收到二次确认消息,MQ Server 则会定时(默认1分钟)向生产者发送回查消息,检查本地事务状态,然后生产者根据本地事务回查结果再次向 MQ Server 发送 commit 或 rollback消息

5.2、场景模拟

场景:假设我们现在有这样的业务:用户充值网费会获得积分,且1元=1积分,用户服务中充值100元,积分服务中要对该用户增加100积分

分析:像这种跨服务、跨库的操作,我们要保证这两个操作要么一起成功、要么一起失败,采用RocketMQ的方案就是:RocketMQ事务消息+本地事务+监听消费,来达到最终一致性

5.3、实现分布式事务

5.3.1、建表

用户表:

CREATE TABLE `t_user` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户表',`name` varchar(16) NOT NULL COMMENT '姓名',`id_card` varchar(32) NOT NULL COMMENT '身份证号',`balance` int(11) NOT NULL DEFAULT '0' COMMENT '余额',`state` tinyint(1) DEFAULT NULL COMMENT '状态(1在线,0离线)',`vip_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'VIP用户标识(1是,0否)',`create_time` datetime NOT NULL COMMENT '创建时间',`last_login_time` datetime DEFAULT NULL COMMENT '最后一次登录时间',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4

积分表:

CREATE TABLE `t_credit` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '积分表',`user_id` int(11) NOT NULL COMMENT '用户id',`username` varchar(16) NOT NULL COMMENT '用户姓名',`integration` int(11) NOT NULL DEFAULT '0' COMMENT '积分',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4

事务日志表:

CREATE TABLE `t_mq_transaction_log` (`transaction_id` varchar(64) NOT NULL COMMENT '事务id',`log` varchar(64) NOT NULL COMMENT '日志',PRIMARY KEY (`transaction_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

5.3.2、编写生产者

@Slf4j
@Component
public class MQTXProducerService {private static final String Topic = "RLT_TEST_TOPIC";private static final String Tag = "charge";private static final String Tx_Charge_Group = "Tx_Charge_Group";@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 先向MQ Server发送半消息* @param userCharge 用户充值信息*/public TransactionSendResult sendHalfMsg(UserCharge userCharge) {// 生成生产事务idString transactionId = UUID.randomUUID().toString().replace("-", "");log.info("【发送半消息】transactionId={}", transactionId);// 发送事务消息(参1:生产者所在事务组,参2:topic+tag,参3:消息体(可以传参),参4:发送参数)TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(Tx_Charge_Group, Topic + ":" + Tag,MessageBuilder.withPayload(userCharge).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build(),userCharge);log.info("【发送半消息】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}
}
  1. 这里我用的UUID生成事务id,就是上面的事务表的id
  2. 方法参数userCharge,额外加的,可理解为dto,就两个字段:userId、chargeAmount,代表用户id和充值金额
  3. 这里注意:发送半消息方法里有两个参数,参3和参4,看过上篇整合教程的应该知道,这个参3是给消费者的,而这个参4是给本地事务的,我这里是模拟写的是一样的,实际业务可能会不同

5.3.3、新建本地事务监听器

@Slf4j
@RocketMQTransactionListener(txProducerGroup = "Tx_Charge_Group") // 这里的txProducerGroup的值要与发送半消息时保持一致
public class MQTXLocalService implements RocketMQLocalTransactionListener {@Autowiredprivate UserService userService;@Autowiredprivate MQTransactionLogMapper mqTransactionLogMapper;/*** 用于执行本地事务的方法*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object obj) {// 获取消息体里参数MessageHeaders messageHeaders = message.getHeaders();String transactionId = (String) messageHeaders.get(RocketMQHeaders.TRANSACTION_ID);log.info("【执行本地事务】消息体参数:transactionId={}", transactionId);// 执行带有事务注解的本地方法:增加用户余额+保存mq日志try {UserCharge userCharge = (UserCharge) obj;userService.addBalance(userCharge, transactionId);return RocketMQLocalTransactionState.COMMIT; // 正常:向MQ Server发送commit消息} catch (Exception e) {log.error("【执行本地事务】发生异常,消息将被回滚", e);return RocketMQLocalTransactionState.ROLLBACK; // 异常:向MQ Server发送rollback消息}}/*** 用于回查本地事务执行结果的方法*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class);log.info("【回查本地事务】transactionId={}", transactionId);// 根据事务id查询事务日志表MQTransactionLog mqTransactionLog = mqTransactionLogMapper.selectByPrimaryKey(transactionId);if (null == mqTransactionLog) { // 没查到表明本地事务执行失败,通知回滚return RocketMQLocalTransactionState.ROLLBACK;}return RocketMQLocalTransactionState.COMMIT; // 查到表明本地事务执行成功,提交}
}
@Service
public class UserService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate MQTransactionLogMapper mqTransactionLogMapper;/*** 用户增加余额+事务日志*/@Transactional(rollbackFor = Exception.class)public void addBalance(UserCharge userCharge, String transactionId) {// 1. 增加余额userMapper.addBalance(userCharge.getUserId(), userCharge.getChargeAmount());// 2. 写入mq事务日志saveMQTransactionLog(transactionId, userCharge);}@Transactional(rollbackFor = Exception.class)public void saveMQTransactionLog(String transactionId, UserCharge userCharge) {MQTransactionLog transactionLog = new MQTransactionLog();transactionLog.setTransactionId(transactionId);transactionLog.setLog(JSON.toJSONString(userCharge));mqTransactionLogMapper.insertSelective(transactionLog);}}
  1. 这里代码是主要关键的地方,本地事务是给用户增加余额后再插入mq事务日志,这两个操作只有成功了,才返回COMMIT,异常失败就返回ROLLBACK
  2. 回查方法不一定会执行,但是得有,回查就是根据我们之前生成穿过来的那个事务id(transactionId)来查询事务日志表,这样的好处是业务牵涉的表再多无所谓,我这个日志表也与你本地事务绑定,我只需查询这一张事务表就够了,能找到就代表本地事务执行成功了

5.3.4、编写消费者

@Slf4j
@Component
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "charge", consumerGroup = "Con_Group_Four") // topic、tag保持一致
public class MQTXConsumerService implements RocketMQListener<UserCharge> {@Autowiredprivate CreditMapper creditMapper;@Overridepublic void onMessage(UserCharge userCharge) {// 一般真实环境这里消费前,得做幂等性判断,防止重复消费// 方法一:如果你的业务中有某个字段是唯一的,有标识性,如订单号,那就可以用此字段来判断// 方法二:新建一张消费记录表t_mq_consumer_log,字段consumer_key是唯一性,能插入则表明该消息还未消费,往下走,否则停止消费// 我个人建议用方法二,根据你的项目业务来定义key,这里我就不做幂等判断了,因为此案例只是模拟,重在分布式事务// 给用户增加积分int i = creditMapper.addNumber(userCharge.getUserId(), userCharge.getChargeAmount());if (1 == i) {log.info("【MQ消费】用户增加积分成功,userCharge={}", JSONObject.toJSONString(userCharge));} else {log.error("【MQ消费】用户充值增加积分消费失败,userCharge={}", JSONObject.toJSONString(userCharge));}}
}
  1. 消费者其实比较简单,和普通消费者差不多,注意属性配置就行了
  2. 这里你可能质疑,前面的发送和本地事务都没啥问题,要么commit要么rollback,但如果这里消费失败怎么办呢?其实这里会产生问题的几率几乎不存在,首先RocketMQ就是高可用的,要真的你系统很庞大很庞大,你可以集群;再者,这里消费成功与否,源码内部已做处理,只要没异常,就会进行消费,而且它也有重试机制;最后,这里消费逻辑你可以扩展,当消费不成功时,你可以把该记录保存下来,定时提醒或人工去处理

5.3.5、测试

@PostMapping("/charge")
public Result<TransactionSendResult> charge(UserCharge userCharge) {TransactionSendResult sendResult = mqtxProducerService.sendHalfMsg(userCharge);return Result.success(sendResult);
}
  1. 看下数据库,发现从余额和积分都加了100,事务日志表也有记录,成功!
  2. 总结:其实理解了事务的实现过程后会发现用RocketMQ解决分布式事务还是挺简单的,毕竟MQ非常友好,而且MQ用处很多,每个项目都可以有。当然现在也有其他热门的分布式事务解决方案,如Seata,按照业务来进行选择

六、SpringBoot整合RocketMQ

6.1、引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency>

6.2、编写配置文件

rocketmq:name-server: 192.168.1.224:9876 # 访问地址,集群模式用;分割。producer:group: test # 必须指定groupsend-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

6.3、编写生产者

@Slf4j
@Component
public class MQProducerService {@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;// 建议正常规模项目统一用一个TOPICprivate static final String topic = "RLT_TEST_TOPIC";// 直接注入使用,用于发送消息到broker服务器@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)*/public void send(User user) {rocketMQTemplate.convertAndSend(topic + ":tag1", user);
//        rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行}/*** 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)* (msgBody也可以是对象,sendResult为返回的发送结果)*/public SendResult sendMsg(String msgBody) {SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}/*** 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)* (适合对响应时间敏感的业务场景)*/public void sendAsyncMsg(String msgBody) {rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑}@Overridepublic void onException(Throwable throwable) {// 处理消息发送异常逻辑}});}/*** 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendDelayMsg(String msgBody, int delayLevel) {rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);}/*** 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)*/public void sendOneWayMsg(String msgBody) {rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());}/*** 发送带tag的消息,直接在topic后面加上":tag"*/public SendResult sendTagMsg(String msgBody) {return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());}}

上面写的这几个消息发送方法,你应该注意到了: 第一个方法和最后一个方法的参数 topic 和其它的不一样。其实这是 rocketmq 和 springboot 整合后设置 Tag 的方式(Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用)在项目里往mq写入消息时,最好每条消息都带上tag,用于消费时根据业务过滤另外,对于延时消息的参数理解是这样:共 18 个等级,值在上面已经注明了,下标从 1 开始,举例:我要发送个延迟为 1 分钟的消息,那么参数 delayLevel 的值为 5

在 rocketmq-spring-boot-starter 中,Tag 的设置方式: 在 topic后面加上 “:tagName”。源码中是以 “:”进行分割的,前面的是 topic,后面的就是 tag,截图如下:

另外,从上面的截图中可以看到“key”的设置方式,发送消息时在header中设置:

MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")

6.4、编写消费者

@Slf4j
@Component
public class MQConsumerService {// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<User> {// 监听到消息就会执行此方法@Overridepublic void onMessage(User user) {log.info("监听到消息:user={}", JSON.toJSONString(user));}}// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")public class ConsumerSend2 implements RocketMQListener<String> {@Overridepublic void onMessage(String str) {log.info("监听到消息:str={}", str);}}// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("监听到消息:msg={}", msg);}}}

消费者这里要注意设置的参数要正确,我这里为了方便就写在一个类里
这里消费者会直接监听生产者发送的消息,一旦生产者那边发送消息,对应这里就会消费
另外可以对注解 @RocketMQMessageListener 点进去看看它的属性参数,都是非常熟悉的
还有,实际生产中,应避免自己的消费者代码出现非业务逻辑上的错误,比如消费时某个消费者报类型转换异常,建议多用TAG做区分

6.5、测试

@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {@Autowiredprivate MQProducerService mqProducerService;@GetMapping("/send")public void send() {User user = User.getUser();mqProducerService.send(user);}@GetMapping("/sendTag")public Result<SendResult> sendTag() {SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");return Result.success(sendResult);}}

6.6、总结

  1. 实际运用中一些配置不要像我上面一样写在代码里,写在配置文件里或统一配置。
  2. 消息发送成功与失败可以根据sendResult判断,消息消费成功与否其实源码内部已做了处理,只要不出现异常,就是消费成功,如果你业务代码逻辑有问题那另说。
  3. 实际生产中还要注意重复消费问题,这里我提供一个方法:在数据库加一个去重表,给表里的一个字段如key添加唯一索引,消费前先入库,正常则往下执行你的业务逻辑,入库失败了表明该消息已消费过,不能往下走了。
  4. 其实rocketmq还有一个很重要的特性:事务,其它mq可是不支持的,利用事务可以做很多事,如跟钱相关的业务、分布式事务,不过事务的实现过程要麻烦点。
  5. 上面就是RocketMQ与Springboot的整合,整合了使用起来还是比较简单的。

Rocket详细教程相关推荐

  1. PixiJS超级详细教程【从入门到入土-上】

    PixiJS 来自GitHub教程 GitHub - Zainking/LearningPixi: ⚡️Pixi教程中文版 PixiJS超级详细教程[从入门到入土-下]地址[https://blog. ...

  2. 手把手从零开始搭建k8s集群超详细教程

    本教程根据B站课程云原生Java架构师的第一课K8s+Docker+KubeSphere+DevOps同步所做笔记教程 k8s集群搭建超详细教程 1. 基本环境搭建 1. 创建私有网络 2. 创建服务 ...

  3. win10系统优化计算机,全面优化win10电脑系统详细教程 | 专业网吧维护

    全面优化win10电脑系统详细教程 以下针对win10系统的电脑全面优化的步骤: 步骤1:禁止开机启动项 1.首先我们先来优化开机速度,拖慢开机速度的首先是开机自启动项,Ctrl + Shift + ...

  4. GPU运行Tensorflow详细教程及错误解决

    GPU运行Tensorflow详细教程及错误解决 前提条件 配置GPU运行 确认是否成功配置 出现的错误及解决方案 前提条件 最重要的一点:CUDA与tensorflow的版本一点要对应,不然用不了! ...

  5. VMware虚拟机安装黑苹果MacOS Mojave系统详细教程

    更多资源请百度搜索:前端资源网 欢迎关注我的博客:www.w3h5.com 最近遇到一个H5页面的 iPhone X 刘海兼容问题.查到一个 XCode 编辑器,可以模拟 iPhone X 环境运行. ...

  6. [分享] 从定制Win7母盘到封装详细教程 By BILL ( 10月23日补充说明 )

    [分享] 从定制Win7母盘到封装详细教程 By BILL ( 10月23日补充说明 ) billcheung 发表于 2011-10-23 00:07:49 https://www.itsk.com ...

  7. win七系统如何卸载MySQL_win7系统卸载SQL2008R2数据库的详细教程

    用过SQL2008R2数据库的朋友都知道,安装起来容易卸起来麻烦,可是在win7 32位旗舰版系统就不知道怎么卸载SQL2008R2数据库了.其实卸载SQL2008R2数据库的方法也很简单,可直接通过 ...

  8. Ubuntu系统安装搜狗输入法详细教程

    Ubuntu16.04系统安装搜狗输入法详细教程 解决Ubuntu 18.04中文输入法的问题,安装搜狗拼音

  9. PHP7Grafika,PHP图片处理库Grafika详细教程(3):图像属性处理

    该文章是接着上篇文章,<PHP极其强大的图片处理库Grafika详细教程(2):图像特效处理模块>,由于grafika功能太多,所以分开写,其他的点击这里 该文章主要写grafika的图像 ...

最新文章

  1. 2018 区块链技术及应用峰会(BTA)·中国全日程新鲜出炉,更多精彩议题看不停
  2. Zookeeper的目录结构
  3. java set 接口_【Java提高十七】Set接口集合详解
  4. GDB调试程序系列 (3)
  5. C语言试题六十三之请编写函数fun:将s所指字符串中ascii值为偶数的字符删除,串中剩余字符形成一个新串放在t所指的数组中。
  6. 你的第一个Django程序
  7. MongoDB compact 命令详解
  8. 古文(诗词文)—— 结构模式与复用
  9. [转载] tensorflow如何微调时如何只训练后两层_XLNet只存在于论文?都替你封装好了还不来用!...
  10. ActiveReports报表设计器
  11. windows 使用自带的cmd终端进行文件MD5校验
  12. Android Palette吸色原理及源码解析
  13. 岚宝科技PM2.5传感器驱动程序
  14. Android material design 之 BottomSheet基础入门
  15. 关于电脑突然蓝屏后,重启idea报错HttpServlet不存在的问题
  16. 安装极狐GitLab(ubuntu)----写给不爱看官方文档的人
  17. Capstone/CS5216 CS5218设计 DP转HDMI转换方案芯片
  18. Mysql 刷题笔记 0104 求出NAME中每组累加/每组总数的比例大于0.6的ID和NAME
  19. 免费PBootCMS采集支持聚合文章采集插件
  20. Altium Designer 18 导线转换45°的快捷键

热门文章

  1. k8s健康检查探针配置
  2. 与领导说话要注意的几个方法
  3. 记录每天学习的新知识:MQTT客户端
  4. 历史上的今天:万维网面世 30 周年;微信公众平台正式上线;计算机先驱诞生日...
  5. Win7+Anaconda安装PyTorch
  6. 正则表达式 年月日时分秒校验
  7. OSX app (Mac app) crash 文件分析与定位
  8. Java、JSP学籍管理系统
  9. java tlab_java虚拟机中容易和JVM栈上分配混淆的TLAB上分配
  10. mysql 反弹shell_Linux下几种反弹Shell方法的总结与理解