RocketMQ-Spring

原文地址:https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md

帮助开发者在Spring Boot中快速集成RocketMQ。支持Spring Message规范,方便开发者从其它MQ快速切换到RocketMQ。

如何贡献和帮助社区

我们永远欢迎开发者的帮助来使这个项目更加完善,无论是小的文档还是大的功能新特性,请参考RocketMQ的主站了解细节

前提条件

  • JDK 1.8 and above
  • Maven 3.0 and above

功能特性:

  • 同步发送
  • 同步顺序发送
  • 异步发送
  • 异步顺序发送
  • 顺序消费
  • 并发消费(广播/集群)
  • one-way方式发送
  • 事务方式发送
  • 消息轨迹
  • ACL
  • pull消费

Quick Start

下面列出来了一些关键点,完整的示例请参考: rocketmq-spring-boot-samples

注意:当前的RELEASE.VERSION=2.0.1

<!--在pom.xml中添加依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${RELEASE.VERSION}</version> </dependency>

发送消息

## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!"); rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build()); rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00"))); // rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate } @Data @AllArgsConstructor public class OrderPaidEvent implements Serializable{ private String orderId; private BigDecimal paidMoney; } }

在发送客户端发送事务性消息并且实现回查Listener

@SpringBootApplication public class ProducerApplication implements CommandLineRunner{ @Resource private RocketMQTemplate rocketMQTemplate; public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args); } public void run(String... args) throws Exception { try { // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)... // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test", "test-topic" msg, null); } catch (MQClientException e) { e.printStackTrace(System.out); } } // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener(transName="test") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } }

更多发送相关配置

rocketmq.producer.send-message-timeout=300000 rocketmq.producer.compress-message-body-threshold=4096 rocketmq.producer.max-message-size=4194304 rocketmq.producer.retry-times-when-send-async-failed=0 rocketmq.producer.retry-next-server=true rocketmq.producer.retry-times-when-send-failed=2

接收消息

## application.properties rocketmq.name-server=127.0.0.1:9876

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

@SpringBootApplication public class ConsumerApplication{ public static void main(String[] args){ SpringApplication.run(ConsumerApplication.class, args); } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>{ public void onMessage(String message) { log.info("received message: {}", message); } } @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2") public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{ public void onMessage(OrderPaidEvent orderPaidEvent) { log.info("received orderPaidEvent: {}", orderPaidEvent); } } }

更多消费相关配置

see: RocketMQMessageListener

消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.enable-msg-trace=true rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyConsumer implements RocketMQListener<String> { ... }

注意:

默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQTransactionListener 配置

ACL

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=my-group rocketmq.producer.access-key=AK rocketmq.producer.secret-key=SK

事务消息的发送需要在 @RocketMQTransactionListener 注解里配置上 AK/SK:

@RocketMQTransactionListener( txProducerGroup = "test, accessKey = "AK", secretKey = "SK" ) class TransactionListenerImpl implements RocketMQLocalTransactionListener { ... }

注意:

可以不用为每个 @RocketMQTransactionListener 注解配置 AK/SK,在配置文件中配置 rocketmq.producer.access-key 和 rocketmq.producer.secret-key 配置项,这两个配置项的值就是默认值

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

@Service @RocketMQMessageListener( topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", accessKey = "AK", secretKey = "SK" ) public class MyConsumer implements RocketMQListener<String> { ... }

注意:

可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

FAQ

  1. 生产环境有多个nameserver该如何连接?

rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

  1. rocketMQTemplate在什么时候被销毁?

开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。

  1. 启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGroup与topic一一对应。

  1. 发送的消息内容体是如何被序列化与反序列化的?

RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。

  1. 如何指定topic的tags?

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。

注意:

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

  1. 发送消息时如何设置消息的key?

可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msg的headers来完成。示例:

Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);

同理还可以根据上面的方式来设置消息的FLAG、WAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。

注意:

在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。

  1. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }

  1. 如何指定消费者从哪开始消费消息,或开始消费的位置?

消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }

同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。

  1. 如何发送事务消息(即半消息支持分布式事务)? 在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。 注意:这个方法通过指定发送者组名与具体的声明了txProducerGroup的TransactionListener进行关联,您也可以不指定这个值,从而使用默认的事务发送者组。

Spring boot 集成rocketMQ 官方文档相关推荐

  1. Spring Boot 2.0官方文档之 Actuator

    https://blog.csdn.net/alinyua/article/details/80009435 前言:本文翻译自Spring Boot 2.0.1.RELEASE官方文档,该Spring ...

  2. spring boot 2.0 官方文档 (一)

    原文来自:https://docs.spring.io/spring-boot/docs/2.0.0.RELEASE/reference/htmlsingle/ 技术与英文都是渣渣的翻译学习:如有错误 ...

  3. 如何下载 spring 官方文档 pdf

    spring 系列框架,官方文档都是html 格式的, 并未提供pdf 的下载入口.在html 路径后直接添加/pdf 即可进入pdf 下载页面 1. spring 官网 进入spring官网: ht ...

  4. Spring官方文档下载

    Spring官方文档下载 最近准备弄弄 spring-batch,需要完整的资料,但是大部分都是零散的,于是准备下个官方文档: spring 官方文档下载 官方文档路径:https://docs.sp ...

  5. 使用cephadm部署单节点ceph集群,后期可扩容(基于官方文档,靠谱,读起来舒服)

    目录 ceph各种部署工具比较(来自官方文档的翻译,靠谱!) 材料准备 cephadm使用条件 服务器有外网访问能力 服务器没有外网访问能力 安装cephadm cephadm的功能 两种安装方式 基 ...

  6. Spring Boot 官方文档学习(一)入门及使用

    Spring Boot 官方文档学习(一)入门及使用 个人说明:本文内容都是从为知笔记上复制过来的,样式难免走样,以后再修改吧.另外,本文可以看作官方文档的选择性的翻译(大部分),以及个人使用经验及问 ...

  7. 【Spring Boot官方文档原文理解翻译-持续更新中】

    [Spring Boot官方文档原文理解翻译-持续更新中] 文章目录 [Spring Boot官方文档原文理解翻译-持续更新中] Chapter 4. Getting Started 4.1. Int ...

  8. Spring Boot配置文件规则以及使用方法官方文档查找以及Spring项目的官方文档查找方法...

    比如要使用Spring Boot实现一个功能,最直接的方式是Google,但是往往搜索出来的都比较乱,关键是乱在不同的版本上,比如1.x版本和2.x版本的配置是不一样的.最明显区别是在使用Thymel ...

  9. Spring 4 官方文档学习 Spring与Java EE技术的集成

    本部分覆盖了以下内容: Chapter 28, Remoting and web services using Spring -- 使用Spring进行远程和web服务 Chapter 29, Ent ...

最新文章

  1. java mysql乐观锁_java乐观锁使用
  2. 用java程序实现记单词_java实现背单词程序
  3. Alibaba Sentinel规则持久化-推模式-手把手教程【基于Nacos】
  4. java控制层创建websocket_用Java构建一个简单的WebSocket聊天室
  5. 信息学奥赛一本通(1176:谁考了第k名)——qsort 函数
  6. android中影藏状态栏和标题栏的几种方法
  7. python入门——P39类和对象:拾遗
  8. OpenGL基础49:高度贴图(下)
  9. javascript--Date
  10. MATLAB--卡尔曼滤波
  11. python架构师书籍_阿里巴巴高级架构师:学好python这本书必看,堪称python入门宝典...
  12. 魔抓编程_编程中的魔数是什么?
  13. 图像处理的空间频率(波数)、角波数与空间波长
  14. 福建高中计算机会考知识点,福建省高中信息技术会考《信息技术基础》复习提纲.doc...
  15. 常用元器件封装的命名规范-002
  16. 从 MQL4 迁移到 MQL5
  17. boost库中thread多线程中的thread_specific_ptr
  18. 创业板IPO审核最新要求及案例解析
  19. 艾艾贴常用的数据类型转换
  20. 『毒舌吐槽社区』-很多敏感内容,你懂的!

热门文章

  1. mysql 三元,三元运算符
  2. 加菲猫经典语录 (转自:http://joke.ajiadi.com/)
  3. 中文分词词典构造简述
  4. Matlab和OpenCv的混合编程,工具箱mexopencv
  5. 二、八、十、十六进制互相转换大全
  6. 拼多多的金融野心:申请直销银行,大量招兵买马
  7. 区块链技术:架构及进展
  8. 详解JVM常量池、Class常量池、运行时常量池、字符串常量池(心血总结)
  9. php正则 过滤 特殊符号,PHP过滤★等特殊符号的正则
  10. 关于jupyter notebook无法显示图片的问题:图片不显示只显示Figure size 640x480 with 1 Axes