SpringCloud-Alibaba之RocketMQ
SpringCloud-Alibaba之RocketMQ
RocketMQ概述
RocketMQ是一款由阿里研发的高性能高可靠性的分布式消息队列,使用Java语言开发,该项目已经贡献给了Apache基金会,成为Apache的顶级开源项目。
在早期,阿里内部使用ActiveMQ作为其消息传递中间件,随着业务的快速增长,基于ActiveMQ的消息队列集群在处理大规模业务吞吐量时会遇到IO等性能瓶颈,阿里研发团队曾努力优化ActiveMQ,但无奈效果不理想,紧接着他们将目光投向Kafka,不幸的是Kafka并不能满足他们的要求, 特别是在低延迟和高可靠性方面。在这种情况下,阿里研发团队最终决定自己研发一个消息队列引擎来处理更广泛的使用场景,包括从传统的发布/订阅到大批量高实时、消息零丢失的交易系统。并且将该方案向社区开放,希望可以服务更多的企业。
RocketMQ官方网址:http://rocketmq.apache.org/
目前已经有超过很多加家企业其业务中使用RocketMQ,下图是部分使用到RocketMQ的大厂:
消息队列的优点
简单来说,消息队列就是一种“先进先出”的数据结构,框架在此基础上实现数据传输的高性能、消息的高可靠性和系统高可用性,所以在如今微服务大行其道的背景下,分布式消息队列可以提供异步、解耦和消峰等功能,已经成为大型互联网服务架构里不可或缺的中间件。
异步:在大型电商系统,通常会将各个核心功能拆分成独立的服务模块,用户的下单操作需要多个服务模块共同完成,如果此时订单服务要一直等到其他服务的操作完成,那么整个下单操作耗时将很长,用户体验会很差。所以通常的做法就是订单服务将数据生成后发送到消息队列之后立刻返回下单成功给用户,其他依赖服务从消息队列里面获取数据进行延后处理,这种延后的异步功能能够提升系统的性能和吞吐量,同时不至于让用户长时间等待。
解耦:很多大型企业里面通常会有很多的子系统,子系统之间当需要进行消息通讯时,可以通过在子系统之间架设消息队列,这样每个子系统只需要关心自己订阅的数据,子系统之间达到完全解耦,甚至各个子系统实现的语言、技术、架构等等都可以不一样。
消峰:每年的双十一,淘宝系统都会面临瞬间大规模流量的冲击,如果没有缓冲机制,是不可能支撑得住的。通过利用消息队列,把大量的用户请求暂存起来,接着服务依次从队列里面将请求取出处理,虽然用户的请求耗时变长了,但相比较于系统崩溃导致的请求失败来说,这实际上已经是提高了用户体验,并且最重要的是保证了系统的稳定性。
RocketMQ基本结构
Client端
- Producer Group 一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致
- Consumer Group 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致
Server端
- Broker 消息中转角色,负责存储消息,转发消息,这里就是RocketMQ Server
- Topic 消息的主题,用于定义并在服务端配置,消费者可以按照主题进行订阅,也就是消息分类,通常一个系统一个Topic
Message 在生产者、消费者、服务器之间传递的消息,一个message必须属于一个Topic 消息是要传递的信息。邮件中必须包含一个主题,该主题可以解释为要发送给您的信的地址。消息还可能具有可选标签和额外的键值对。例如,您可以为消息设置业务密钥,然后在代理服务器上查找消息以在开发过程中诊断问题。
Namesrver 一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要是接收broker的注册,接收客户端的路由请求并返回路由信息
Offset 偏移量,消费者拉取消息时需要知道上一次消费到了什么位置, 这一次从哪里开始
Partition 分区,Topic物理上的分组,一个Topic可以分为多个分区,每个分区是一一个有序的队列。 分区中的每条消息都会给分配一个有序的ID,也就是偏移量,保证了顺序,消费的正确性
Tag 用于对消息进行过滤,理解为message的标记,同一业务不同目的的message可以用相同的topic但是 可以用不同的tag来区分
简单来说就是用来进一步区分某个 Topic 下的消息分类。 Topic 与 Tag 最佳实践
key 消息的KEY字段是为了唯- -表示消息的,方便查问题,不是说必须设置,只是说设置为了方便开发和运维定位问题。 比如:这个KEY可以是订单ID等
我们实际上只需要把 Topic 和 xx_Group 和 Tag 弄明白了就行
基于 Docker 安装 RocketMQ(单机版)
必须安装有docker环境 包括 dockerfile环境 还必须有git 服务器 可用空闲硬盘内存5G以上 内存1.5G以上
如果没有安装git
yum install git
下载安装配置
mkdir -p /usr/local/src/rocketmq
cd /usr/local/src/rocketmq
git clone git://github.com/foxiswho/docker-rocketmq.git
移动到brokercnf文件夹下
cd /usr/local/src/rocketmq/docker-rocketmq/rmq/rmq/brokerconf/
修改配置文件
vi broker.conf
30行左右位置 ,修改
namesrvAddr=192.168.81.128:9876 为宿主机IP (不然在项目里推送消息就永远找不到服务连接失败)
如果是集群的话分号分割 namesrvAddr=192.168.81.128:9876;192.168.81.129:9876;192.168.81.130:9876
brokerIP1=192.168.25.142为宿主机IP (如果不设置为当前安装RocketMQ的ip那么启动就会失败 或者访问不到页面)
注意一定要把 #去掉
移动到docker-compose.yml
文件位置
cd /usr/local/src/rocketmq/docker-rocketmq/rmq
修改docker-compose.yml配置
vi docker-compose.yml
每个版本的配置都不同但是默认就行 主要就是改全部的JVM 不然虚拟机内存不够使用会导致启动不了(测试使用,生产跳过)
-Xms128M -Xmx128M -Xmn128m
集群的话第一个节点不用动配置文件其他子节点只需要 rmqnamesrv 和 rmqbroker , rmqconsole可以从配置文件中删除就行了
运行容器
先修改 启动文件
vi /usr/local/src/rocketmq/docker-rocketmq/rmq/start.sh
#!/usr/bin/env bash# 创建目录
mkdir -p ./rmqs/logs
mkdir -p ./rmqs/store
mkdir -p ./rmq/logs
mkdir -p ./rmq/store# 设置目录权限
chmod -R 777 ./rmqs/logs
chmod -R 777 ./rmqs/store
chmod -R 777 ./rmq/logs
chmod -R 777 ./rmq/storedocker-compose -f ./docker-compose.yml up -d# 显示 rocketmq 容器
docker ps |grep rocketmq
然后启动就行了
/usr/local/src/rocketmq/docker-rocketmq/rmq/start.sh
查看容器状态
docker-compose -f ./docker-compose.yml ps
打开运维页面
http://192.168.81.128:8180
出现以上页面代表ok了
选择中文
添加配置
添加Maven
<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>
application.yml
生产者
rocketmq:# rocketmq地址nameServer: 192.168.81.128:9876producer:# 生产者 必须填写 groupgroup: test-group
消费者
rocketmq:# rocketmq地址nameServer: 192.168.81.128:9876
如果生产者和消费者都在一个服务里的话那么,使用生产者的配置就行了
端口号自行根据情况添加
server:port: 13000
其他配置根据自己需求添加…
MQ模板工具
@Resourceprivate RocketMQTemplate rocketMQTemplate;
将MQ常规的操作都给封装好了直接调用就行
消息生产者
发送普通消息(最常用)
将消息直接发送到MQ中 ,不管是否被消费 ,只负责单纯的发送消息
public void sync() {rocketMQTemplate.convertAndSend("topic-name", "send convertAndSend message !");}
底层还是使用 send发送的 就是发送普通消息
同步发送消息(sync)
发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。
这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次
发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。
public void sync() {SendResult sendResult = rocketMQTemplate.syncSend("topic-name", "send sync message !");//消费者消费完毕后会返回结果, 否则当前线程一直卡死 System.out.printf("syncSend1 to topic %s sendResult=%s %n", stringTopic, sendResult);}
异步发送消息(async)
发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。
同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次,发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。
public void async() {rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("send successful");}@Overridepublic void onException(Throwable throwable) {log.info("send fail; {}", throwable.getMessage());}});}
直接发送消息(one-way)
采用one-way
发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送(不关心发送结果)。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。
public void oneWay() {rocketMQTemplate.sendOneWay("topic-name", "send one-way message");}
总结(同步,异步,直接)
在实际使用场景中,利用何种发送方式,可以总结如下:
- 当发送的消息不重要时,采用
one-way
方式,以提高吞吐量; - 当发送的消息很重要是,且对响应时间不敏感的时候采用
sync
方式; - 当发送的消息很重要,且对响应时间非常敏感的时候采用
async
方式;
发送延迟消息(Delayed)
rocketMQTemplate.sendDelayed("test-topic-1", "I'm delayed message", MessageDelayLevel.TIME_1M);
发送顺序消息
rocketMQTemplate.syncSendOrderly("test-topic-4", "I'm order message", "1234");
消费者必须开启接受顺序消费才行
封装后的发送消息工具类
package com.rocketmq.service.impl;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Slf4j
@Componentpublic class MsgSenderTemplateUtils {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 发送普通消息 ** @param data 消息信息* @param topic 主题*/public void sendMessage(String topic, Object data) {rocketMQTemplate.convertAndSend(topic, data);}/*** 发送普通消息 ** @param data 消息信息* @param topic 主题* @param tags 主题的标签*/public void sendMessage(String topic, String tags, Object data) {rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data);}/*** 发送消息(支持分布式事务)** @param data 消息信息* @param topic 主题*/public void sendMessageInTransaction(String topic, Object data) {MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);messageBuilder.setHeader("msg", JSON.toJSONString(data));rocketMQTemplate.sendMessageInTransaction(topic, messageBuilder.build(), null);}/*** 发送消息(支持分布式事务)** @param data 消息信息* @param topic 主题* @param tags 主题的标签*/public void sendMessageInTransaction(String topic, String tags, Object data) {try {MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);messageBuilder.setHeader("msg", JSON.toJSONString(data));System.out.println(rocketMQTemplate);rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null);} catch (MessagingException e) {e.printStackTrace();}}
}
演示代码
controller
package com.rocketmq.controller;import com.rocketmq.service.impl.MsgSenderTemplateUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
public class TestController {@Resourceprivate MsgSenderTemplateUtils msgSender;// http://localhost:13000/test/*** 普通消息投递*/@GetMapping("/test")public String test() {Map<String, Object> orderInfo = new HashMap<>();orderInfo.put("orderId", UUID.randomUUID().toString());orderInfo.put("price", 10000);orderInfo.put("description", "我是注册订单,请尽快处理");msgSender.sendMessage("topicTest", orderInfo);
// msgSender.sendMessage("topicTest", "order", orderInfo);return "投递消息 => " + orderInfo + " => 成功";}// http://localhost:13000/test1/*** 普通消息投递 带tags*/@GetMapping("/test1")public String test1() {Map<String, Object> orderInfo = new HashMap<>();orderInfo.put("orderId", UUID.randomUUID().toString());orderInfo.put("price", 1000011);orderInfo.put("description", "我是注册订单,请尽快处理");msgSender.sendMessage("topicTest", "order", orderInfo);return "投递消息 => " + orderInfo + " => 成功2225412521512``";}// http://localhost:13000/test2/*** 分布式事务消息投递*/@GetMapping("/test2")public String test2() {Map<String, Object> orderInfo = new HashMap<>();orderInfo.put("orderId", UUID.randomUUID().toString());orderInfo.put("price", 10000);orderInfo.put("description", "我是注册订单,请尽快处理");msgSender.sendMessageInTransaction("topicTest", "order", orderInfo);return "投递消息 => " + orderInfo + " => 成功";}
}
注意: RocketMq 分布式事务还不能使用因为还没有实现事务监听类… 分布式事务,这个是干啥的, 保证消息绝对提交到MQ中,下面会讲如何使用
消息消费者
配置介绍
消费者只需要在类上加@RocketMQMessageListener(xxx)就行了
常用的注解参数
consumerGroup 消费者分组
topic 主题
selectorType 消息选择器类型
默认值 SelectorType.TAG 根据TAG选择
仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息selectorExpression 选择器表达式
默认值 ”*“
consumeMode 消费者模式
ConsumeMode.CONCURRENTLY (异步多线程消费 没有顺序 速度最快) (默认)
ConsumeMode.ORDERLY (顺序消费 )
messageModel 广播消费模式与集群消费模式
MessageModel.CLUSTERING(集群消费模式 就相当于是负载均衡) (默认)
MessageModel.BROADCASTING (广播模式)
集群模式
当 Consumer 使用集群模式时,每条消息只会被 Consumer 集群内的任意一个 Consumer 实例消费一次。
比如: 当一个 Consumer 集群内有 3 个 Consumer 实例时,一条消息到达时,只会被Consumer 1、Consumer 2、Consumer 3中的一个消费。
@Slf4j
@Component //必须注入spring容器
@RocketMQMessageListener(messageModel= MessageModel.CLUSTERING, //(集群消费模式 默认配置)consumeMode= ConsumeMode.CONCURRENTLY , //(异步多线程消费 默认配置)topic = "topicTest",selectorType = SelectorType.TAG,consumerGroup = "test-consumer-group", selectorExpression = "order") //tag
public class Consumer1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {String message = new String(messageExt.getBody());log.info("==============================消费者1");log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),messageExt.getMsgId(), message);Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);log.info("订单信息 orderInfo = {} ", orderInfo.toString());}}
注意如果想测试那么就需要在启动一个服务把端口号变了就行,因为同一个项目只支持注册一个topic 和不同consumerGroup)
广播模式
当 Consumer 使用广播模式时,每条消息都会被 Consumer 集群内所有的 Consumer 实例消费一次。
比如:当一个 Consumer 集群内有 3 个 Consumer 实例时,一条消息到达时,会被Consumer 1、Consumer 2、Consumer 3 三个都消费一次。
消费者1:
@Slf4j
@Component //必须注入spring容器
@RocketMQMessageListener(messageModel= MessageModel.CLUSTERING,consumeMode= ConsumeMode.CONCURRENTLY ,topic = "topicTest",selectorType = SelectorType.TAG,consumerGroup = "test-consumer-group1", selectorExpression = "order") //tag
消费者2:
@Slf4j
@Component //必须注入spring容器
@RocketMQMessageListener(messageModel= MessageModel.CLUSTERING,consumeMode= ConsumeMode.CONCURRENTLY ,topic = "topicTest", //topic:消息的发送者使用同一个topicselectorType = SelectorType.TAG,consumerGroup = "test-consumer-group2", //group:不用和生产者group相同selectorExpression = "order") //tag
一定要注意保证 consumerGroup 组的名称要不同否则会导致消息发送混乱
消息重复消费和幂等性问题
问题的原因
现在稍微大点的项目都会有用到消息队列中间件,因为消息队列有异步解耦、流量削峰、数据分发等许多好处。但是在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复。如果消息重复则会影响到我们正常的业务处理,这时就要对消息做幂等处理。最近,我所在项目中,就出现了消息重复被消费的问题,造成了对用户的过渡营销引起了投诉,于是我们项目立即召开会议讨论方案解决此问题。这里涉及到消息重复和幂等,下面先说下这两概念,然后再说下我们用到的解决方案。
消息重复的场景
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对生产者的确认应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当消费者给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker 重启以及消费者应用重启。当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
解决方式
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
具体流程如下:
- 消费者收到任务后先在redis 中查询此任务的 id 是否存在
- 如果存在那么就代表有人已经消费过这个任务了,那么就直接return 就行了
- 如果没有存在那么就先将任务的ID(必须保证ID是唯一可以采用雪花id) 存储在Redis的Set集合中(Set集合可以保证没有重复)
- 然后在处理业务逻辑…
RocketMq 生产者事务
保证消息绝对提交到MQ中了 不会因为中途断电情…况导致生产者提交了消失而MQ没有收到
原理就是 生产者先发送半个消息到MQ,这个消息消费者是看不到的,然后一段时间(10ms~60ms)左右
消费者向MQ进行回查,判断半个消息是否被MQ接收到了,如果MQ接收到了,
那么生产者在向MQ发送消息提交(COMMIT_MESSAGE),表示次消息消费者可以进行消费了
如果MQ没有收到半个消息,那么生产者就返回一个事务状态(ROLLBACK_MESSAGE)将取消发送半个消息,
注意,这里还涉及到另外一个问题。如果是返回未知状态(UNKNOW),RocketMQ Broker
服务器会以1分钟的间隔时间不断回查,直至达到事务回查最大检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。
@Slf4j
@RocketMQTransactionListener
public class RocketMqTransaction implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {// MessageHeaders headers = message.getHeaders();
// Map<String, Object> msg = JSON.parseObject(headers.get("msg").toString(),Map.class);log.info("executeLocalTransaction - UNKNOWN ");return RocketMQLocalTransactionState.UNKNOWN; //开启消息回查}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers = message.getHeaders();if (Objects.isNull((headers.get("msg")))){log.info("checkLocalTransaction - UNKNOWN ");return RocketMQLocalTransactionState.UNKNOWN; //消息没有被MQ接收到 发送一个未知状态 继续回查}Map<String, Object> msg = JSON.parseObject(headers.get("msg").toString(),Map.class);if (msg.isEmpty()) {log.info("checkLocalTransaction - UNKNOWN ");return RocketMQLocalTransactionState.UNKNOWN; //消息没有被MQ接收到 发送一个未知状态 继续回查}log.info("checkLocalTransaction - COMMIT ");return RocketMQLocalTransactionState.COMMIT; //确认消息以被MQ接受到了}
}
虽然我们保证了生产者消息绝对提交成功,那么生产者怎么办??? 我们可以使用消费者 ACK机制,来保证消费者绝对消费成功
RocketMq 消费者ACK
1、什么是消息确认ACK。
答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
2、ACK的消息确认机制。
答:ACK机制是消费者从RocketMq 收到消息并处理完成后,反馈给RocketMq ,RocketMq 收到反馈后才将此消息从队列中删除。
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RocketMq 会认为这个消息没有正常消费,会将消息重新放入队列中。
如果在集群的情况下,RocketMq 会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RocketMq 中删除,只有当消费者正确发送ACK反馈,RocketMq 确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的。
3、ACK机制的开发注意事项。
答:如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RocketMq 会长时间运行,因此这个"内存泄漏"是致命的。
在RocketMq 中ACK分为有序和无序
有序ACK
@Slf4j
@Component //必须注入spring容器
public class ConsumerAck {// 顺序消费的回调public ConsumerAck() throws MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-ack-group");// 设置NameServer的地址consumer.setNamesrvAddr("192.168.81.128:9876:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("topicTest", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {try {for (MessageExt messageExt : msgs) {String message = new String(messageExt.getBody());log.info("ACk收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),messageExt.getMsgId(), message);Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);log.info("ACk订单信息 orderInfo = {} ", orderInfo.toString());}// int i=1/0; 模拟出错 实现ACK机制// 标记该消息已经被成功消费return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {e.printStackTrace();// 标记该消息失败,从新打回到mq中 //无限循环直到成功 而不是16次后进入死队列return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}}
无序ACK
@Slf4j
@Component //必须注入spring容器
public class ConsumerAck {public ConsumerAck() throws MQClientException {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-ack-group");// 设置NameServer的地址consumer.setNamesrvAddr("192.168.81.128:9876:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("topicTest", "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){try {for (MessageExt messageExt : msgs) {String message = new String(messageExt.getBody());log.info("ACk收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),messageExt.getMsgId(), message);Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);log.info("ACk订单信息 orderInfo = {} ", orderInfo.toString());}int i=1/0; 模拟出错 实现ACK机制// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {e.printStackTrace();// 标记该消息失败,从新打回到mq中而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}}
分布式事务解决方案
- 订单系统向
MQ
发送一条预备扣减库存消息,MQ
保存预备消息并返回成功ACK
- 接收到预备消息执行成功
ACK
,订单系统执行本地下单操作,为防止消息发送成功而本地事务失败,订单系统会实现MQ
的回调接口,其内不断的检查本地事务是否执行成功,如果失败则rollback
回滚预备消息;成功则对消息进行最终commit
提交。 - 库存系统消费扣减库存消息,执行本地事务,如果扣减失败,消息会重新投,一旦超出重试次数,则本地表持久化失败消息,并启动定时任务做补偿。
基于消息中间件的两阶段提交方案,通常用在高并发场景下使用,牺牲数据的强一致性换取性能的大幅提升,不过实现这种方式的成本和复杂度是比较高的,还要看实际业务情况。
出现的问题
创建broker容器出现闪退情况
内存不够(最低2G),硬盘大小不够(最低留4G空闲硬盘)
生产者推送消息失败情况
[CL: 0.91 CQ: 0.91 INDEX: 0.91]出现类似这个问题表示当前分区内磁盘占用率以及高达百分之91了 表示内存不足了
可用通过 下命令查询linux磁盘占用率
df -h
没办法只能加硬盘内存了
消费者接收不到消息情况
关闭防火墙 或者把 9876 ,10911 ,10909 11011 11009 这5个端口开发
firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --zone=public --add-port=10909/tcp --permanent firewall-cmd --zone=public --add-port=11011/tcp --permanent firewall-cmd --zone=public --add-port=11009/tcp --permanent
其他问题
/usr/local/src/rocketmq/docker-rocketmq/rmq/rmq/logs/rocketmqlogs
找到 broker.log 看日志到底啥情况
如果出现下面这个问题
ERROR: for rmqnamesrv Cannot start service rmqnamesrv: driver failed programming external connectivity on endpoint rmqnamesrv (a3508287c83b6a778d19f97e9720dbf37c60e7f6a55fa2902d5468b530cdd119): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 9876 -j DNAT --to-destination 172.18.0.2:9876 ! -i br-c0bbc3d62576: iptables: No chain/target/match by that name.(exit status 1))
ERROR: Encountered errors while bringing up the project.
解决办法
重启docker
systemctl restart docker
点赞 -收藏加 -关注 便于以后复习和收到最新内容 有其他问题在评论区讨论-或者私信我-收到会在第一时间回复 感谢,配合,希望我的努力对你有帮助^_^
SpringCloud-Alibaba之RocketMQ相关推荐
- SpringCloud Alibaba微服务实战(一) - 基础环境搭建
说在前面 Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案.此项目包含开发分布式应用微服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来 ...
- SpringCloud—07—高级之SpringCloud Alibaba上
文章目录 提前预知 17.SpringCloud Alibaba入门简介 17.1.为什么会出现SpringCloud alibaba 18.Spring Cloud Alibaba Nacos服务注 ...
- SpringCloud Alibaba 从入门到精通(精选)
SpringCloud Alibaba 从入门到精通 一. 课程介绍 1.1 课程导学 1.2 项目环境搭建 二. SpringBoot基础 2.1 本章概述 2.2 Spring Boot是什么?能 ...
- SpringCloud从入门到精通教程/SpringCloud Alibaba从入门到精通教程
对于SpringCloud,很多小伙伴问到了我的研究学习资料来源,除官方文档外,特例完整整理一下自己的平时参考学习其他资料,以及分享实战项目源码和代码资源,供大家参考学习 主要教程:SpringClo ...
- 微服务实战系列之SpringCloud Alibaba学习(四)
微服务实战系列之SpringCloud Alibaba: 微服务实战系列之SpringCloud Alibaba学习(一) 微服务实战系列之SpringCloud Alibaba学习(二) 微服务实战 ...
- 最新微服务框架SpringCloud Alibaba介绍,搭建
微服务和SpringCloud Alibaba详细介绍(一),手把手搭建微服务框架 PS:本博客是本人参照B站博主:JAVA阿伟如是说 的视频讲解手敲整理的笔记 跟着一起手动搭建的框架 供大家一起学习 ...
- 【springcloud alibaba】 一条龙服务实现微服务案例
第一章 微服务介绍 1.1 系统架构演变 1.1.1 SpringCloud Spring Cloud是一系列框架的集合.它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发, ...
- 十、SpringCloud + Alibaba 全家桶详解(目前公司最新技术)
点击下载完整脑图https://kejizhentan.lanzouj.com/ixYBO068xjhe 一. SpringBoot2.X版和SpringCloud H版 1. SpringBoot和 ...
- 关于SpringCloud Alibaba,动力节点的这份笔记教程真香
什么是Spring Cloud Alibaba? 首先我们需要了解一下Spring Cloud,然后再来了解Spring Cloud Alibaba: Spring Cloud 源自官方描述: Spr ...
- SpringCloud Alibaba 学习圣经,10万字实现 SpringCloud 自由
40岁老架构师尼恩的掏心窝: 现在拿到offer超级难,甚至连面试电话,一个都搞不到. 尼恩的技术社群中(50+),很多小伙伴凭借 "左手云原生+右手大数据 +SpringCloud Ali ...
最新文章
- 参加第十六届全国大学生智能车竞赛广东省报名队伍
- 教你搞定Android自定义ViewGroup
- android中几种定位方式详解
- 互联网广告系统综述四定向
- Java List 分页
- 各科老师的语言风格一览,太真实了哈哈哈哈哈哈
- ffmpeg结构体SpecifierOpt说明文档
- java棋盘覆盖分治法_【单选题】实现棋盘覆盖算法利用的算法是( )
A. 分治法 B. 动态规划法 C. 贪心法 D. 回溯法...
- matlab新建脚本java报错,Matlab(四)脚本的使用
- java day20【字节流、字符流】
- (转)HDOJ 4006 The kth great number(优先队列)
- 何为领导力 —— 《Working Backwards》书评
- CICD实战——使用Jenkins实现自动化部署和环境隔离
- html5 牧场游戏,手机QQ首批五款HTML5游戏名单 农场偷菜复活
- opencv中step、step1、size、elemSize以及elemSize1区别
- 服务器未响应wan口连接失败,路由器WAN口连接失败如何解决 路由器WAN口连接失败解决方法【详解】...
- android短信删除,Android删除短信的方法
- 计算机网络应用赛甘肃省,关于举办第三届“甘肃省大学生创新杯计算机运用能力竞赛”预赛的.doc...
- 机器学习中的ground-truth
- 京东校招 最优打字策略