为什么要用 MQ?

MQ (消息队列) 是一种先进先出的数据结构,其应用场景主要包括以下三个方面:

  • 应用解耦。以电商为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出现故障或者因为升级等原因暂时不可用都会造成下单操作异常,影响用户使用体验。使用 MQ 后,比如物流系统发生故障,需要几分钟才能修复,在这段时间,物流系统要处理的数据被缓存到 MQ 中,用户的下单操作正常完成,当物流系统回复后,补充处理存在 MQ 中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障;
  • 流量削峰。将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验;
  • 数据分发。数据的生产者不需要关心谁来消费数据,只需要将数据发送到 MQ ,数据消费者直接在 MQ 中获取数据即可。

MQ 的优点?缺点?

优点:应用解耦、流量削峰、数据分发。

缺点:

  • 系统可用性降低。系统引入的外部依赖越多,系统稳定性越差,一旦 MQ 宕机,就会对业务造成影响;如何保证 MQ 的高可用?
  • 系统复杂性提高。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
  • 一致性问题。A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息数据,如果 B、C 系统处理成功,D 系统处理失败,就会造成一致性问题。如何保证消息数据处理的一致性?

各类 MQ 产品的比较:

常见的 MQ 产品包括 ActiveMQ、RabbitMQ、RocketMQ、Kafka 。

ActiveMQ RabbitMQ RocketMQ Kafka
开发语言 java erlang java scala
单机吞吐量 万级 万级 10万级 10万级
时效性 ms级 us级 ms级 ms级以内
可用性 高 (主从架构) 高 (主从架构) 非常高 (分布式架构) 非常高 (分布式架构)
功能特性 成熟的产品,在很多公司得到应用,有较多的文档,各种协议支持较好 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,管理界面较丰富 MQ 功能比较完备,扩展性佳 只支持主要的 MQ 功能,像一些消息查询、消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

RocketMQ 是阿里巴巴在 2016 年开源的 MQ 中间件,使用 Java 语言开发,在阿里内部,RocketMQ 承接了例如 “双11” 等高并发场景的消息流转,能够处理万亿级别的消息。

RocketMQ 由几部分组成以及每个组件的作用?

RocketMQ 各角色:

角色 说明
Producer 消息的生产者,类似 “发件人” 与 NameServer 集群中的其中一个节点 (随机选择) 建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,定时向 Master 发送心跳,Producer 完全无状态,可集群部署。
Consumer 消息的消费者,类似 “收件人” 与 NameServer 集群中的其中一个节点 (随机选择) 建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,定时向 Master、Slave 发送心跳,Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Broker 暂存和传输消息,核心角色,类似 “邮局” Master 主要处理写操作,Slave 主要处理读操作,Master 和 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
NameServer 管理 Broker,核心角色,类似 “邮局的管理机构” NameServer 是无状态节点,节点之间无任何信息同步,因为 Broker 启动后,会给每一个 NameServer 节点上报信息。
Topic 区分消息的种类 一个生产者可以发送消息给一个或者多个 Topic;一个消费者可以订阅一个或者多个 Topic 消息。
Message Queue 相当于是 Topic 的分区,用于并行生产和消费数据

集群工作流程:

  1. 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心;
  2. Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包,心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息,注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系;
  3. 收发消息前,先创建 Topic,创建 Topic 时需指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic;
  4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发送消息;
  5. Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

1.RocketMQ的使用

使用 RocketMQ 需要添加 Maven 依赖:

<!-- rocketmq -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version>
</dependency>

1.基本用法

1、生产者生产消息

发送同步消息:

这种可靠性同步地发送方式使用的比较广泛,比如重要的消息通知、短信通知。

// 实例化消息生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876;localhost:9877");
producer.start();
// 创建消息, 指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);// 发送状态
SendStatus sendStatus = sendResult.getSendStatus();
// 消息ID
String msgId = sendResult.getMsgId();
// 消息接收队列ID
int queueId = sendResult.getMessageQueue().getQueueId();// 如果不再发送消息, 关闭消息生产者
// producer.shutdown();

发送异步消息:

通常用在对响应时间敏感的业务场景,即生产者不能容忍长时间地等待 Broker 的响应。

只需要调用 DefaultMQProducer 的异步实现即可:

// 发送消息到一个Broker
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {}@Overridepublic void onException(Throwable e) {}
});

发送单向消息:

这种方式主要用在生产者不关心发送结果的场景,例如发送日志。

只需要调用 DefaultMQProducer 的 sendOneway() 方法即可:

// 发送单向消息, 没有任何返回结果
producer.sendOneway(msg);

2、消费者消费消息

负载均衡模式(默认模式):

// 实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876;localhost:9877");
// 订阅Topic、Tag
consumer.subscribe("TopicTest", "Tag1"); // 多个Tag可以用||隔开, 例如"Tag1||Tag2", 消费所有Tag使用"*"
// 设置负载均衡模式, 默认MessageModel.CLUSTERING
// consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置回调函数, 处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {// 消息String message = new String(msg.getBody());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();

消费者广播模式:

只需要消费者实例调用 setMessageModel() 方法设置广播模式即可:

// 设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

2.顺序消息

顺序消息是指可以按照消息的发送顺序来消费,RocketMQ 可以严格保证消息有序,分为分区有序或者全局有序。

顺序消息的原理?

在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue (分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。

但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序,当发送和消费参与 queue 只有一个,则是全局有序;如果多个 queue 参与,则是分区有序,即相对每个 queue,消息都是有序的。

比如以订单的顺序流程为例,创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个订单号获取到的肯定是同一个队列。

1、生产者生产消息

对订单 id 取模选择队列:

// 订单流程
String[] msgs = new String[] {"创建消息", "付款消息", "推送消息", "完成消息"};
for (int i = 0; i < msgs.length; i++) {// 创建消息, 指定Topic、Tag、key和消息体Message msg = new Message("OrderTopic", "Order", "i" + i, msgs[i].getBytes("UTF-8"));// 发送消息到一个Broker, 参数二: 消息队列的选择器, 参数三: 选择的业务标识SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {long orderId = (long) arg;long index = orderId % mqs.size(); // 取模return mqs.get((int) index);}}, orderId); // 动态传入订单id
}

2、消费者消费消息

只是设置 Consumer 回调函数不同:

// 订阅Topic、Tag
consumer.subscribe("OrderTopic", "*");
// 设置回调函数, 处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {// 消息String message = new String(msg.getBody());}return ConsumeOrderlyStatus.SUCCESS;}
});

3.延时消息

比如电商场景,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

1、生产者生产消息

只需要为 Message 对象设置延时等级即可:

// 创建消息, 指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
// 设置延时等级3, 这个消息将在10s后发送
msg.setDelayTimeLevel(3);
// 发送消息到一个Broker
producer.send(msg);

使用限制:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在 RocketMQ 不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 8。

2、消费者消费消息

只是设置 Consumer 回调函数不同:

// 订阅Topic、Tag
consumer.subscribe("TopicTest", "*");
// 设置回调函数, 处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {// 消息String message = new String(msg.getBody());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});

4.批量消息

批量发送消息能显著提高传递小消息的性能,限制是这些批量消息应该有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。

1、生产者生产消息

如果每次只发送不超过 4MB 的消息,则很容易使用批处理:

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "Tag1", "OrderID001", "hello 0".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID002", "hello 1".getBytes("UTF-8")));
messages.add(new Message("TopicTest", "Tag1", "OrderID003", "hello 2".getBytes("UTF-8")));
// 发送消息到一个Broker
producer.send(messages);

如果消息的总长度可能大于 4MB 时,最好把消息进行分割:

public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节if (tmpSize > SIZE_LIMIT) { // 单个消息超过了最大的限制// 假如下一个子列表没有元素, 则添加这个子列表然后退出循环, 否则只是退出循环if (nextIndex - currIndex == 0) {nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) { break; }else { totalSize += tmpSize; }}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
// 把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {}
}

2、消费者消费消息跟基本用法一致。

5.过滤消息

过滤消息有两种方式:Tag 和 SQL 表达式。

在大多数情况下,Tag 是一个简单而有用的设计,其可以来选择想要的消息,例如:

// 实例化消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 订阅Topic、Tag
consumer.subscribe("TopicTest", "Tag1 || Tag2 || Tag3");

消费者将接收包含 Tag1、Tag2、Tag3 的消息,但是限制是一个消息只能有一个 Tag,这对于复杂的场景可能不起作用,在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。在 RocketMQ 定义的语法下,可以实现一些简单的逻辑。

1、SQL 基本语法:

RocketMQ 只定义了一些基本语法来支持这个特性,支持扩展。

  • 数值比较,比如 >、>=、<、<=、BETWEEN、=;
  • 字符比较,比如 =、<>、IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND、OR、NOT。

常量支持的额类型为:

  • 数值,比如 123、3.1415;
  • 字符,比如 ‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量;
  • 布尔值,TRUE 或 FALSE。

只有使用 push 模式的消费者才能使用 SQL92 标准的 SQL 语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2、生产者生产消息

发送消息时,可以通过 putUserProperty 来设置消息的属性:

Message msg = new Message("TopicTest", "Tag1", "hello rocketmq".getBytes("UTF-8"));
msg.putUserProperty("a", "2");

3、消费者消费消息

用 MessageSelector.bySql 来使用 SQL 筛选消息:

// 订阅Topic、Tag
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));

6.事务消息

事务消息是为了解决分布式事务的。

1、生产者生产消息

将消息的发送分成了 2 个阶段:Prepare 阶段和确认阶段。


2、消费者消费消息


常见问题:

1、RocketMQ 消息怎么保证可靠性以及高可用性?

2、RocketMQ 消息种类以及怎么保证消息有序?

Spring Boot 整合消息中间件 RocketMQ相关推荐

  1. Spring Boot 消息队列 RocketMQ 入门

    转载自  芋道 Spring Boot 消息队列 RocketMQ 入门 摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/RocketMQ/ 「芋道源码」欢迎转载 ...

  2. 芋道 Spring Boot 消息队列 RocketMQ 入门

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  3. Spring Boot: Spring Boot 整合 RabbitMQ

    前言 RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷.消息分发的作用. 消息队列在比较主要的一个作用是用来做应 ...

  4. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

  5. Spring Boot整合Redis以及Redis的原理

    Redis的原理及知识 Redis简介 redis是一个key-value.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合). ...

  6. spring boot整合spring security笔记

    最近自己做了一个小项目,正在进行springboot和spring Security的整合,有一丢丢的感悟,在这里分享一下: 首先,spring boot整合spring security最好是使用T ...

  7. Spring Boot 教程(三): Spring Boot 整合Mybatis

    教程简介 本项目内容为Spring Boot教程样例.目的是通过学习本系列教程,读者可以从0到1掌握spring boot的知识,并且可以运用到项目中.如您觉得该项目对您有用,欢迎点击收藏和点赞按钮, ...

  8. 五、spring boot整合mybatis-plus

    spring boot整合mybatis-plus 简介 mybatis 增强工具包,简化 CRUD 操作. 文档 http://mp.baomidou.com http://mybatis.plus ...

  9. spring boot 整合mybatis 无法输出sql的问题

    使用spring boot整合mybatis,测试功能的时候,遇到到了sql问题,想要从日志上看哪里错了,但是怎么都无法输出执行的sql,我使用的是log4j2,百度了一下,很多博客都说,加上下面的日 ...

最新文章

  1. 分布式监控系统开发【day38】:监控数据如何画图(九)
  2. 梁兴珍 java_数据结构与算法_Java语言
  3. (部分转载,部分原创)java大数类(2)
  4. CentOS7环境下搭建ElasticSearch
  5. Scrapy 简介及初探
  6. SQLite字符串拼接
  7. Alarm:IT界朋友请珍惜你的身体[转贴]
  8. java.net.URLEncoder 、URLDecoder 编码与解码
  9. Ucinet软件使用
  10. Redis入门官方文档
  11. 学习总结以及对接下来的规划
  12. linux skype的安装
  13. 京东上线“友家铺子”,社区团购进入洗牌阶段
  14. cerna(测rna浓度260280比值大于2)
  15. Ubuntu下安装小企鹅fcitx输入法
  16. Keras中verbose的作用
  17. Doxygen + Graphviz 安装(windows 10系统)
  18. protobuf 3.5 java使用介绍(二)
  19. vue 使用 :class 根据不同状态值设置状态文字颜色不同
  20. Java中的BigDecimal类你了解多少?

热门文章

  1. 深夜美食与微信公众号开发
  2. SDE、PM、DS等相关职位的面试题
  3. “第六届中国大数据应用论坛”隆重举行
  4. 单车车路:中国的车路双智能到底有多牛?丨曼孚科技
  5. 数据库事务(个人理解)
  6. 前端知识 | React Native Animated动画浅谈
  7. PHP显示的时间与服务器上时间不同
  8. arch linux 安装xfce_Arch Linux桌面环境(Xfce4)安装教程
  9. Nexus 3 清理docker镜像
  10. KeyStore简介和使用