什么是RocketMQ

阿里消息队列 RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,同时是收费的产品。

应用场景

削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列 RocketMQ 版可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列 RocketMQ 版可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列 RocketMQ 版与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。

分布式缓存同步

天猫双 11 大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过消息队列 RocketMQ 版构建分布式缓存,实时通知商品数据的变化。

1、配置pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!--lombok-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency><!--阿里RocketMQ-->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.0.Final</version>
</dependency>

2、配置application.properties

server.port=8888
#rocketmq配置
#鉴权用AccessKeyId在阿里云服务器管理控制台创建
rocketmq.accessKey=accessKey
#鉴权用AccessKeySecret在阿里云服务器管理控制台创建
rocketmq.secretKey=secretKey
#tcp长连接,设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
rocketmq.namesrvAddr=http://MQ_INST_15namesrvAddr7I.cn-hangzhou.mq-internal.aliyuncs.com:8080
#mq主题,,您在控制台创建的topic
rocketmq.topic=topic
#mq组名,您在控制台创建的 Group ID
rocketmq.groupId=groupId

以上参数均可在阿里控制台中找到

3、配置类

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;/*** RocketMQ配置* @author RickSun && iFillDream* @date 2020/01/10 15:58* @Copyright "轻梦致新"即"iFillDream"微信公众号所有*/
@Configuration
public class RocketMQConfig {@Value("${rocketmq.accessKey}")public String accessKey;public static String ACCESS_KEY;@Value("${rocketmq.secretKey}")public String secretKey;public static String SECRET_KEY;@Value("${rocketmq.namesrvAddr}")public String namesrvAddr;public static String NAMESRV_ADDR;@Value("${rocketmq.groupId}")public String groupId;public static String GROUP_ID;@Value("${rocketmq.topic}")public String topic;public static String TOPIC;/*** 配置RocketMq参数* @return Properties*/public Properties getProperties() {Properties properties = new Properties();//您在控制台创建的GroupIDproperties.put(PropertyKeyConst.GROUP_ID, groupId);// 鉴权用AccessKeyId在阿里云服务器管理控制台创建properties.setProperty(PropertyKeyConst.AccessKey, accessKey);// 鉴权用AccessKeySecret在阿里云服务器管理控制台创建properties.setProperty(PropertyKeyConst.SecretKey, secretKey);//延时时间properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");// 顺序消息消费失败进行重试前的等待时间单位(毫秒)properties.put(PropertyKeyConst.SuspendTimeMillis, "100");// 消息消费失败时的最大重试次数properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");// 设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);return properties;}/*** 初始化静态常量*/@PostConstructpublic void init(){ACCESS_KEY = this.accessKey;SECRET_KEY = this.secretKey;NAMESRV_ADDR = this.namesrvAddr;GROUP_ID = this.groupId;TOPIC = this.topic;}
}

4、RocketMQ工具

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;/*** RocketMQ工具* @author RickSun && iFillDream* @date 2020/01/10 16:07* @Copyright "轻梦致新"即"iFillDream"微信公众号所有*/
@Component
@Slf4j
public class MQUtil {@Autowiredprivate RocketMQConfig rocketMQConfig;/*** 发送普通消息* @param content 内容* @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列*/public void sendMessage(String content,String tag){Message message = new Message();message.setBody(content.getBytes());message.setTopic(RocketMQConfig.TOPIC);message.setTag(tag);this.sendCustomerMessage(message);}/*** 发送定时任务* @param content   内容* @param tag   标签* @param delayTime 定时时间*/public void sendDelayMessage(String content,String tag,long delayTime){Message message = new Message();message.setBody(content.getBytes());message.setTopic(RocketMQConfig.TOPIC);message.setTag(tag);/*** 单位毫秒(ms)* 在指定时间戳(当前时间之后)进行投递* 例如 2016-03-07 16:21:00 投递* 如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者*/message.setStartDeliverTime(System.currentTimeMillis() delayTime);this.sendCustomerMessage(message);}/*** 发送消息* @param message*/private void sendCustomerMessage(Message message) {Properties properties=rocketMQConfig.getProperties();Producer producer = ONSFactory.createProducer(properties);//在发送消息前,必须调用start方法来启动Producer,只需调用一次即可producer.start();try {SendResult sendResult = producer.send(message);// 同步发送消息,只要不抛异常就是成功if (sendResult != null) {log.info("消息发送成功:messageID:" sendResult.getMessageId());}} catch (Exception e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理e.printStackTrace();}//在应用退出前,销毁Producer对象producer.shutdown();}
}

5、标签

package com.ifilldream.rocketmq_lean.mq;/*** RocketMQ Tag业务标签* @author RickSun && iFillDream* @date 2020/01/10 16:32* @Copyright "轻梦致新"即"iFillDream"微信公众号所有*/
public class MqTag {/*** 根据业务老创建标签*///测试1public final static String ROCKETMQTEST1 = "ROCKETMQ_TEST1";//测试2public final static String ROCKETMQTEST2 = "ROCKETMQ_TEST2";
}

6、消费者

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;/*** RocketMQ消费者* @author RickSun && iFillDream* @date 2020/01/10 16:29* @Copyright "轻梦致新"即"iFillDream"微信公众号所有*/
@Component
@Slf4j
public class RocketMQConsumer {@Autowiredprivate RocketMQConfig rocketMQConfig;/*** 订阅消息,处理业务*/public void normalSubscribe() {Properties properties = rocketMQConfig.getProperties();Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(RocketMQConfig.TOPIC, "", new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {try {//接收到的消息内容String msg = new String(message.getBody(), "UTF-8");String tag = message.getTag();switch (tag) {case MqTag.ROCKETMQTEST1:log.info("收到消息messageID:"   message.getMsgID()   " msg:"   msg);//TODO do somethingbreak;case  MqTag.ROCKETMQTEST2:log.info("收到消息messageID:"   message.getMsgID()   " msg:"   msg);//TODO do somethingbreak;}return Action.CommitMessage;} catch (Exception e) {log.info("消费失败:messageID:"   message.getMsgID());e.printStackTrace();return Action.ReconsumeLater;}}});consumer.start();}
}

7、消费者启动监听

package com.ifilldream.rocketmq_lean.mq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;/*** RocketMQ启动监听* @author RickSun && iFillDream* @date 2020/01/10 16:07* @Copyright "轻梦致新"即"iFillDream"微信公众号所有*/
@Component
public class RocketConsumerListener implements CommandLineRunner {@Autowiredprivate RocketMQConsumer rocketMQConsumer;@Overridepublic void run(String... args) {System.out.println("========rocketMQ消费者启动==========");rocketMQConsumer.normalSubscribe();}
}

8、接口

package com.ifilldream.rocketmq_lean.controller;
import com.ifilldream.rocketmq_lean.mq.MQUtil;
import com.ifilldream.rocketmq_lean.mq.MqTag;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** @ClassName RocketController* @Author RickSun && iFillDream* @Date 2020/1/10 15:18* @Version 1.0*/
@RestController
@RequestMapping("/ifilldream/rocketmq")
public class RocketController {@Resourceprivate MQUtil mqUtil;@GetMapping("/test")public String test(String content) {return content;}@GetMapping("/test1")public String test1(String content) {mqUtil.sendMessage(content, MqTag.ROCKETMQTEST1);mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST1, 1000L);return "success";}@GetMapping("/test2")public String test2(String content) {mqUtil.sendMessage(content, MqTag.ROCKETMQTEST2);mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST2,3000L);return "success";}}

此时代码完毕,在Linux服务器上运行项目Jar包,浏览器中输入:xx.xx.xx.xx:8888/ifilldream/rocketmq/test1?content=nihao即可看到效果;xx.xx.xx.xx为服务器的IP或域名,运行效果如下:以上代码亲测可用,更多详情请关注阿里官方文档https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.1a4b7e805ygc75

统一首发平台为微信公众号"轻梦致新",搜索关注公众号,第一时间阅读最新内容。

SpringBoot整合阿里RocketMQ相关推荐

  1. SpringBoot整合阿里云OSS文件上传、下载、查看、删除

    SpringBoot整合阿里云OSS文件上传.下载.查看.删除 该项目源码地址:https://github.com/ggb2312/springboot-integration-examples ( ...

  2. SpringBoot整合阿里Druid数据源及Spring-Data-Jpa

    SpringBoot整合阿里Druid数据源及Spring-Data-Jpa https://mp.weixin.qq.com/s?__biz=MzU0MDEwMjgwNA==&mid=224 ...

  3. springboot整合使用rocketMq

    前文,我们讲述了rocketMq的基本使用,接下来聊聊如何使用springboot整合使用rocketMq; 1)新建maven工程,工程结构目录如图: constants包下存放着常量信息,这里保存 ...

  4. SpringBoot整合——阿里云对象存储(OSS)

    SpringBoot整合--阿里云对象存储 1 OSS介绍 在开发应用的过程中,我们经常会有用户需要实名认证之后才能访问的需求. 用户认证需要上传证件图片.首页轮播也需要上传图片,因此我们要做文件服务 ...

  5. SpringBoot整合阿里云视频点播

    文章目录 SpringBoot整合阿里云视频点播 1.准备工作 2.服务端SDK的使用 2.1 导入依赖 2.2 初始化类 2.3 创建读取公共常量的工具类 2.4 获取视频播放地址 2.5 获取视频 ...

  6. SpringBoot整合阿里云短信服务详细过程(保证初学者也能实现)

    前言 网上关于实操性的文章普遍大部分都记录不全,要么只记录重点部分,对于刚学习的小伙伴来说看起来是比较困难的 所以,基于这一点. 该文章会详细介绍使用SpringBoot整合阿里云短信服务的每一步过程 ...

  7. SpringBoot整合阿里云OSS

    文章目录 SpringBoot整合阿里云OSS 1.准备工作 1.1 开通"对象存储OSS"服务 1.2 创建Bucket 1.3 创建RAM子用户 2.SpringBoot整合阿 ...

  8. springboot整合阿里云oss上传的方法示例

    这篇文章主要介绍了springboot整合阿里云oss上传的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 OSS申请和 ...

  9. SpringBoot整合阿里云OSS上传文件

    一.需求分析 文件上传是一个非常常见的功能,就是通过IO流将文件写到另外一个地方,这个地方可以是项目下的某个文件夹里,或者是本地电脑某个盘下面,还可以是云服务OSS里面,这里就是我要讲到的OSS,我写 ...

最新文章

  1. dataTable 从服务器获取数据源的两种表现形式
  2. 黑盒测试--因果图法
  3. session存储在redis/memcache/mysql
  4. 计算机系统安全风险管理,信息系统安全风险及其控制措施.doc
  5. Linux蜂鸣器实验(使用上一节子系统思想,摈弃了自己配置寄存器的繁琐操作)
  6. 身为程序员的唐僧说:只要我不死,就能取到真经!
  7. FPGA实现VGA显示(三)——————单个字符显示
  8. Docker phpMyAdmin 连接访问宿主机本地mysql
  9. 一篇文章带你登顶 MacBook高效工作环境配置
  10. 我所了解的GB2312、Unicode、GBK、UTF-8、BIG5等编码
  11. vue 中动态添加组件方式
  12. 友好的可视化工具——trelliscope
  13. EndNote设置自动导入文献
  14. 模拟题【枚举计数】咒语
  15. 一个定语修饰两个并列的名词。
  16. 联通沃云 服务器 FTP安装设置
  17. t型三电平matlab仿真,T型三电平逆变器在不间断电源中的实现
  18. 使用再生龙制作linux系统镜像及还原
  19. wxpython制作桌面悬浮球
  20. 恒温恒湿实验室(房)建设、设计SICOLAB

热门文章

  1. python中类的构造方法的名称是_智慧职教: Python中类的构造方法的名称是( )。
  2. Kotlin自定义一个简单实用的标题栏
  3. 搜狐视频协议分析之Tracker交互协议分析 [20161113]
  4. Microsoft Edge使用感受
  5. 关于拉格朗日对偶问题中对偶性的理解
  6. JLU吉林大学21级软件数据结构上机实验(1)
  7. WindowsVista系统盘揭密:Users文件夹
  8. PageHelper实现分页查询
  9. 安装了MathType但Word中公式打不开
  10. 【Zotero高效知识管理】(2)Zotero的安装、百度云存储配置及常用插件安装