使用场景,不限于下面

  1. 用户下订单结束,如果用户未及时支付,后续需要取消订单,可以怎么做?定时任务可以做,但是不能接近实时或者消耗数据库性能太大。
  • 【数据库定时任务方案】:定时任务可以做到订单的状态的改变,但是数据库定时任务是定时执行的,而我们订单状态改变的时间确是随时可以发生的,所以定时任务不能做到准实时。
  • 【消息中间件方案】:我们可以将消息投递到中间件里面,通常设置一个延迟时间,精确到某个时刻进行消费,那么可以实现准实时。
  1. 有一个推送,具体是这样的:我们需要在某个时刻发送给某个用户消息,而这个时刻是某个时间过多久之后的时刻,怎么做?
  • 利用消息中间件的延迟消息特点,将消息放入后固定时间后消费,可以完美解决。但是注意每个中间件延迟时间是否有限制。

RabbitMQ延迟消息实现方式一【基于rabbitmq_delayed_message_exchange插件方式】

经过多次测试,插件的方式不能使用RabbitMQ的事务消息,所以推荐第二种方式。

软件下载地址

https://www.rabbitmq.com/download.html

安装过程略!网上有很多教程。

插件下载地址

https://www.rabbitmq.com/community-plugins [千万记住,一定选好版本号。如果没有选对版本,在使用延迟消息的时候,会遇到各种各样的奇葩问题,而且网上还找不到解决方案]

插件安装

启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange [启动插件成功后,重启RabbitMQ,让其生效]

SpringBoot集成RabbitMQ

依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml配置

#配置rabbitMq 服务器
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.publisher-confirms=false
spring.rabbitmq.publisher-returns=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 重试机制(默认开启并一直重试),最大尝试五次,每次间隔3秒
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=3000
spring.rabbitmq.virtual=

延迟消息配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 延迟消息* @author niky*/
@Configuration
public class DelayMsgConfig {public static final String DELAY_EXCHANGE = "delay_exchange";public static final String DELAY_KEY = "delay_key";public static final String DELAY_QUEUE = "delay_queue";@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>(1);args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Queue delayQueue() {return new Queue(DELAY_QUEUE, true);}@Beanpublic Binding cfgDelayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY).noargs();}/*** 配置启用rabbitmq事务*/@Beanpublic RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

生产者发送消息

 @Overridepublic void sendDelayMsg(DelayMsgPushStatus msg) {//这里的消息可以是任意对象,无需额外配置,直接传即可logger.info("===============延时队列生产消息====================");logger.info("发送时间:{},发送内容:{}", LocalDateTime.now(), JSON.toJSONString(msg));this.rabbitTemplate.convertAndSend(DelayMsgConfig.DELAY_EXCHANGE,DelayMsgConfig.DELAY_KEY,msg.getMsg(),message -> { //注意这里时间可以使long,而且是设置headermessage.getMessageProperties().setHeader("x-delay", msg.getDelayTime());return message;});String time = LocalDateTime.ofEpochSecond((System.currentTimeMillis() + msg.getDelayTime()) / 1000, 0, ZoneOffset.ofHours(8)).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));logger.info("延迟时间:{}ms,到推时间:{}", msg.getDelayTime(), time);logger.info("===============延时队列生产消息====================\n");}

消费者consumer

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.frame.dyzh.core.common.dictmap.DelayMsgPushStatus;
import com.frame.dyzh.core.common.util.ValidateUtil;
import com.frame.dyzh.core.constant.SystemConstant;
import com.frame.dyzh.module.doctor.entity.TMSubscribe;
import com.frame.dyzh.module.doctor.mapper.TMSubscribeMapper;
import com.frame.dyzh.module.patient.dto.PushMixedDto;
import com.frame.dyzh.module.patient.util.PushUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** @author niky*/
@Configuration
public class DelayMsgReceiver {@Resourceprivate TMSubscribeMapper subscribeMapper;private Logger logger = LoggerFactory.getLogger(this.getClass().getName());@RabbitListener(queues = DelayMsgConfig.DELAY_QUEUE)public void delayMsgReceiver(Message msg) {String body = new String(msg.getBody());logger.info("===============接收队列接收消息====================");logger.info("接收时间:{},接受内容:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), new String(msg.getBody()));DelayMsgPushStatus delayMsgPushStatus = null;try {delayMsgPushStatus = JSONObject.toJavaObject(JSONObject.parseObject(body), DelayMsgPushStatus.class);if (ValidateUtil.isEmpty(delayMsgPushStatus)) {logger.info("无法识别队列消息DelayMsgPushStatus NULL");return;}} catch (Exception e) {logger.info("无法识别队列消息:" + e.getMessage());}logger.info("===============接收队列接收消息====================\n");if (ValidateUtil.isEmpty(delayMsgPushStatus)) {return;}// 延迟推送获业务String msgType = delayMsgPushStatus.getType();PushMixedDto pushMixedDto = delayMsgPushStatus.getPushMixedDto();if (ValidateUtil.isNotEmpty(msgType) && ValidateUtil.isNotEmpty(pushMixedDto)) {// 预约消息if (msgType.equals(SystemConstant.QUEUE_MSG_TYPE_SUBSCRIBE) || msgType.equals(SystemConstant.QUEUE_MSG_TYPE_REPEAT_SUBSCRIBE)) {PushUtil.push(pushMixedDto);}// 用户15分钟内没有评价,系统自动判定问诊结束else if (msgType.equals(SystemConstant.TENCENT_MSGTYPE_AUTO_USER_PATIENT_OVER)) {PushUtil.push(pushMixedDto);// @todo 判断用户是否已经评价TMSubscribe subscribe = (TMSubscribe)pushMixedDto.getMerchatExtra().get(0);TMSubscribe subDbCheck = subscribeMapper.selectById(subscribe.getSubscribeId());if (ValidateUtil.isNotEmpty(subDbCheck) && ValidateUtil.isEmpty(subDbCheck.getSatisfaction())) {// todo 系统自动给你判断结束操作}}else {logger.info("不支持的业务消息类型");}}}
}

RabbitMQ延迟消息实现方式二【基于死信队列方式】

RabbitMQ配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
@Slf4j
public class DelayMsgConfig {/*** 延迟队列 TTL 名称*/private static final String TTL_DELAY_QUEUE = "ttl_delay_queue";/*** DLX,dead letter发送到的 exchange* 延时消息就是发送到该交换机的*/public static final String  TTL_DELAY_EXCHANGE = "ttl_delay_exchange";/*** routing key 名称* 具体消息发送在该 routingKey 的*/public static final String TTL_DELAY_ROUTING_KEY = "ttl_delay_key";public static final String DELAY_FORWARD_QUEUE_NAME = "delay_forward_queue";public static final String DELAY_FORWARD_EXCHANGE_NAME = "delay_forward_exchange";public static final String DELAY_FORWARD_ROUTING_KEY = "delay_forward_route_key";@Beanpublic Queue delayTTLQueue() {Map<String, Object> params = new HashMap<>();// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称params.put("x-dead-letter-exchange", DELAY_FORWARD_EXCHANGE_NAME);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称params.put("x-dead-letter-routing-key", DELAY_FORWARD_ROUTING_KEY);return new Queue(TTL_DELAY_QUEUE, true, false, false, params);}@Beanpublic DirectExchange delayTTLExchange() {return new DirectExchange(TTL_DELAY_EXCHANGE);}@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayTTLQueue()).to(delayTTLExchange()).with(TTL_DELAY_ROUTING_KEY);}@Beanpublic Queue forwardQueue() {return new Queue(DELAY_FORWARD_QUEUE_NAME, true);}@Beanpublic DirectExchange forwardDirectExchange() {return new DirectExchange(DELAY_FORWARD_EXCHANGE_NAME);}@Beanpublic Binding forwardBinding() {return BindingBuilder.bind(forwardQueue()).to(forwardDirectExchange()).with(DELAY_FORWARD_ROUTING_KEY);}
}

生产者发送消息,使用了事务消息。redis实现了幂等性控制,生产环境推荐搭建集群模式防止单机宕机

@Override
@Transactional(rollbackFor = Exception.class)
public void sendDelayMsg(DelayMsgPushStatus msg) {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");ObjectMapper objectMapper = new Jackson2ObjectMapperBuilder().findModulesViaServiceLoader(true).serializerByType(LocalDateTime.class, new LocalDateTimeSerializer(formatter)).deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer(formatter)).build();String value;try {value = objectMapper.writeValueAsString(msg);} catch (JsonProcessingException e) {throw new BizException("JSON转换异常【sendDelayMsg】");}//这里的消息可以是任意对象,无需额外配置,直接传即可logger.info("===============延时队列生产消息====================");logger.info("发送时间:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));logger.info("发送内容:\n{}", JSON.toJSONString(msg));rabbitTemplate.setChannelTransacted(true); this.rabbitTemplate.convertAndSend(DelayMsgConfig.TTL_DELAY_EXCHANGE,DelayMsgConfig.TTL_DELAY_ROUTING_KEY,value,message -> {//注意这里时间可以使long,而且是设置header (使用插件方式)// message.getMessageProperties().setHeader("x-delay", msg.getDelayTime());message.getMessageProperties().setExpiration(msg.getDelayTime() + "");return message;});String time = LocalDateTime.ofEpochSecond((System.currentTimeMillis() + msg.getDelayTime()) / 1000, 0, ZoneOffset.ofHours(8)).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));logger.info("延迟时间:{}ms", msg.getDelayTime());logger.info("到推时间:{}", time);logger.info("===============延时队列生产消息====================\n");
}

消费者监听配置

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.frame.dyzh.config.DelayMsgConfig;
import com.frame.dyzh.core.common.basicentity.TempFormMap;
import com.frame.dyzh.core.common.dictmap.DelayMsgPushStatus;
import com.frame.dyzh.core.common.enums.DelayPushTypeEnum;
import com.frame.dyzh.core.common.exception.BizException;
import com.frame.dyzh.core.common.util.ValidateUtil;
import com.frame.dyzh.core.common.util.weichant.WebChatPushEntity;
import com.frame.dyzh.core.task.AsynPushTask;
import com.frame.dyzh.module.doctor.entity.TMSubscribe;
import com.frame.dyzh.module.doctor.mapper.TMSubscribeMapper;
import com.frame.dyzh.module.order.service.ITUOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** @author niky*/
@Configuration
public class DelayMsgListener {@Resourceprivate TMSubscribeMapper subscribeMapper;@Resourceprivate AsynPushTask asynPushTask;@Autowiredprivate ITUOrderService orderService;private Logger logger = LoggerFactory.getLogger(this.getClass().getName());@RabbitListener(queues = {DelayMsgConfig.DELAY_FORWARD_QUEUE_NAME})public void delayMsgReceiver(Message msg) {String body = new String(msg.getBody());logger.info("=============== 延迟队列【" + DelayMsgConfig.DELAY_FORWARD_QUEUE_NAME + "】接收消息 ====================");logger.info("接收时间:{}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));logger.info("接受内容:\n{}",  new String(msg.getBody()));DelayMsgPushStatus delayMsgPushStatus = null;try {delayMsgPushStatus = JSONObject.toJavaObject(JSONObject.parseObject(body), DelayMsgPushStatus.class);if (ValidateUtil.isEmpty(delayMsgPushStatus)) {logger.info("无法识别队列消息DelayMsgPushStatus=NULL");return;}} catch (Exception e) {logger.info("无法识别队列消息:" + e.getMessage());}logger.info("=============== 延迟队列【" + DelayMsgConfig.DELAY_FORWARD_QUEUE_NAME + "】接收消息 ====================\n");if (ValidateUtil.isEmpty(delayMsgPushStatus)) {return;}DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");ObjectMapper objectMapper = new Jackson2ObjectMapperBuilder().findModulesViaServiceLoader(true).serializerByType(LocalDateTime.class, new LocalDateTimeSerializer(formatter)).deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer(formatter)).build();// 延迟推送业务String msgType = delayMsgPushStatus.getType();if (ValidateUtil.isNotEmpty(msgType)) {// 1.用户15分钟内没有评价,系统自动判定咨询结束if (msgType.equals(DelayPushTypeEnum.system_auto_patient_over.getType())) {TMSubscribe subscribe;try {subscribe = objectMapper.readValue(delayMsgPushStatus.getExtra() + "" , TMSubscribe.class);} catch (Exception e) {logger.error(e.getMessage());throw new BizException("类型转换异常,请管理员注意~!");}TMSubscribe subReal = subscribeMapper.selectById(subscribe.getSubscribeId());// 用户是否评价if (ValidateUtil.isNotEmpty(subReal) && ValidateUtil.isEmpty(subReal.getSatisfaction())) {// 默认五星好评subReal.setSatisfaction(10);subReal.setStat(1); // 完成咨询subscribeMapper.updateById(subReal);// 更新订单状态TempFormMap map = new TempFormMap();map.put("orderId", subReal.getOrderId());orderService.updateMerchartOrderStatus(map);}} else {logger.info("不支持的业务消息类型");}// 微信推送失败发送} else if (msgType.equals(DelayPushTypeEnum.weichat_push_fail.getType())) {WebChatPushEntity entity = null;try {entity = objectMapper.readValue(delayMsgPushStatus.getExtra() + "", WebChatPushEntity.class);} catch (IOException e) {e.printStackTrace();}// 用户是否评价if (ValidateUtil.isNotEmpty(entity)) {asynPushTask.sendWebChatTemplateMsg(entity);}}}
}

RabbitMQ延迟消息场景分析以及实现两种方式(SpringBoot)相关推荐

  1. python selenium自动化断言_python+selenium自动化登录测试,设计不同场景进行登录,两种方式断言,截图保存...

    # coding : utf-8 # date :2019/1/7 # 根据不同场景做自动化登录测试 # 正确账号密码.正确账户错误密码.等其他场景 from selenium import webd ...

  2. linux中安shell怎么传入参数,【linux】linux 下 shell命令 执行结果赋值给变量【两种方式】...

    方法1:[通用方法] 使用Tab键上面的反引号 例子如下: find命令 模糊查询在/apps/swapping目录下 查找 文件名中包含swapping并且以.jar结尾的文件 使用反引号 引住命令 ...

  3. RabbitMQ(六)——Spring boot中消费消息的两种方式

    前言 上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功 ...

  4. android asynctask源码分析,Android通过Handler与AsyncTask两种方式动态更新ListView(附源码)...

    本文实例讲述了Android通过Handler与AsyncTask两种方式动态更新ListView的方法.分享给大家供大家参考,具体如下: 有时候我们需要修改已经生成的列表,添加或者修改数据,noti ...

  5. javax消息队列_java实现消息队列的两种方式(小结)

    实现消息队列的两种方式 Apache ActiveMQ官方实例发送消息 下载解压后拿到java代码实例 然后倒入IDE 如下: 请认真阅读readme.md文件,大致意思就是把项目打成两个jar包,然 ...

  6. RocketMq 消费消息的两种方式 pull 和 push

    在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者) 区别是: push方式里,consumer把轮询过程封装了,并 ...

  7. RocketMq : 消费消息的两种方式 pull 和 push

    文章目录 1.概述 两种方式的优缺点对比: push pull 1.概述 原创在:https://blog.csdn.net/zhangcongyi420/article/details/905483 ...

  8. Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比

    Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比 标签: AndroidAsyncTaskThreadPool异步加载view 2 ...

  9. SDN的两种方式分析

    ONF成立两年之际,SDN开始得到业界的广泛认同,尤其是从IT真正走入了CT,而CT的网络类型五花八门,这使得SDN的应用场景急剧增长,大有无所不能之势. 业界目前所推的SDN无非两种 方式一是原生的 ...

最新文章

  1. 利用Phtoshop去掉图片中的线性渐变背景
  2. 老BOJ 13 K-based Numbers
  3. 佳能发售曝光对应的「IXY DIGITAL 3000 IS」
  4. php访问c#接口,介绍C#中的接口
  5. 百家号怎么加网站链接进行引流,方法让你轻松掌握
  6. vue移动端实现word在线预览
  7. 《Axure RP 8 实战手册》pdf
  8. 刷题——必备十二大网站
  9. 1097: 计算平均成绩(函数专题)
  10. 模拟退huo算法的特点_模拟退火算法(有完整实例源代码)
  11. 4核处理器_最便宜的16核洋垃圾怎么样?建议别买
  12. 揭秘;抖音美妆账号如何做?如何玩转抖音美妆类运营:国仁楠哥
  13. 用金碟软件二次开发ERP是否有前途?
  14. FastDFS-6.06安装(Centos 7)
  15. 痴情人肠断 无情最逍遥
  16. PostgreSQL之日期时间小结
  17. 图像处理之Bolb分析(一)
  18. php屏蔽地区_屏蔽指定省份访问PHP代码
  19. python 绘制 频谱图
  20. 删除/关闭Word文稿中的插件

热门文章

  1. 淘宝企业的定价是怎么来的呢淘宝企业店铺转让价格有哪些因素?
  2. 全端工程师必备技能汇总
  3. 六年级数学期中考试只考了88分, 但试卷被老师写下:Good! very good!! very very good!!!
  4. input输入的执行顺序【document,onkeydown】
  5. 武汉大学计算机学院周浩,武汉大学电子信息学院导师介绍:周浩
  6. 周浩:互联网凭什么做金融?
  7. 图像边缘检测——一阶微分算子 Roberts、Sobel、Prewitt、Kirsch、Robinson
  8. 微信文本消息html,微信公众帐号开发教程第8篇-文本消息中使用网页超链接
  9. 国产手机迈入关键时期,弯道超车指日可待?
  10. photoshop cs3 打开文件很慢的解决办法