使用RabbitMQ实现消息的延迟消费

文章目录

  • 前言
  • 一、RabbitMQ是什么?
    • 1.RabbitMQ简介
    • 2.RabbitMQ的优点
    • 3.常用组件
    • 4.RabbitMQ的结构图
    • 5.交换机的类型
      • **A-直连交换机**:
      • **B-扇形交换机**:
      • **C-主题交换机**:
      • **D-头交换机**:
  • 二、定时推送思路实现
    • 1.Time To Live(TTL)
    • 2.Dead Letter Exchanges(DLX)
    • 3.具体实现
  • 总结

前言

最近做的项目涉及到后台消息的推送和app端接受消息的功能。具体的要求是:后台向app用户推送的消息,app用户能在app里面的消息栏目查看到消息详情,后台推送的时候能选择定时推送和立即推送两种方案。由于公司的基础架构以及公司文化原因,我们选择了RabbitMQ来实现推送服务和消息盒子之间的通讯。

注意:由于知识有限,本篇文章只对RabbitMQ做基础的介绍,使用场景,以及代码实现,不会对它的高级用法和策略进行介绍。


一、RabbitMQ是什么?

1.RabbitMQ简介

RabbitMQ是有erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。
常见的消息队列有:RabbitMQ、Kafka 和 ActiveMQ

2.RabbitMQ的优点

RabbitMQ最初起源于金融系统,用于不同模块之间的消息通讯。
优点:

  • 可靠性:可持久化,消息传输和发布确认。
  • 灵活性:通过交换机将消息路由到对应的队列。
  • 集群:多台mq可组成集群,对外提供整体服务
  • 支持多语言:支持多种语言
  • 可界面操作:提供简易的用户操作界面
    等等。

3.常用组件

1.生产者(Producer):消息的制造者
2.消费者(Consumer):消息的消费者
3.消息(Message):消息对象,包括业务参数和mq的参数
4.队列(Queue):缓存或暂存储消息的容器
5.连接(Connection):应用服务和mq进行交互的tcp连接
6.信道(Channel):AMQP命令都是通过信道来完成的,它是一个虚拟的通道,来复用ctp连接。
7.交换机(Exchange):负责从生产者接收消息,根据路由规则发送到指定的队列里面。
8.路由键(Routing Key):交换机根据路由键将消息发往指定的队列。
9.虚拟主机(Virtual Host):就像是一个RabbitMQ 服务。

4.RabbitMQ的结构图

一张图介绍RabbitMQ组成以及各个组件之间的关系(参考网上画的)。

5.交换机的类型

交换机是用来发送消息到指定的队列里面的,它使用哪种路由算法是由交换机类型和绑定的规则所决定的。

A-直连交换机

直连交换机是根据交换机和队列之间绑定的路由键,来将消息发往指定的队列里面。如果交换机与多个队列绑定,则在发送携带路由键的消息时,只发给此路由键的队列,每个队列都是相同副本(比较适合一对一)。

例如:我用直连交换机test-direct-exchange根据路由键test-direct发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线

B-扇形交换机

扇形交换机是将消息发往与它绑定的队列,而不去理会绑定的路由键是否一致。如果交换机与多个队列绑定,每个队列都是相同副本,起到广播的作用。

例如:我用扇形交换机test-fanout-exchange根据路由键test-fanout发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
但是三个队列都收到了消息,可见扇形交换机会忽略其路由键

C-主题交换机

主题交换机是通过消息的路由键跟交换机和队列之间的绑定路由键进行匹配,将消息发给匹配上的队列,跟直连交换机的一对多相似,但是他的路由键可以支持模糊匹配。

例如:我用主题交换机test-topic-exchange根据路由键test.topic2发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
根据消息路由键和绑定的路由键进行模糊匹配,推送消息。

D-头交换机

头交换机是主题交换机有点相似,主题交换机是基于路由键,而头交换机是基于消息的headers数据,所以在发送消息给头交换机时指定Routing key是不起作用的。头交换机在绑定队列时需要指定参数Arguments,发送消息时需要指定headers和Arguments相匹配,消息才能被推到相应的队列。

例如:我用头交换机test-headers-exchange根据路由键test-headers1发送一条消息,然后去队列里面看消息,如图所示

分割线------------------------------------------------------------------------------分割线
如果前两个队列能收到消息,证明路由键不生效。

二、定时推送思路实现

定时推送,顾名思义就是要让消息的发送时间给延迟,要想发送延迟消息,就要用到延迟队列。RabbitMQ没有提供延迟队列,其有自带的死信队列和消息存活时间。
对于此需求有两种解决方案:一、使用插件,延迟队列来实现。二、使用消息存活时间和死信队列来实现。我是基于第二种方法来实现的。

1.Time To Live(TTL)

RabbitMQ可以根据队列或消息不同颗粒度来设置消息的生存时间,如果同时设置,则会根据最早的到期时间为准,将消息变为dead letter(死信)。
显而易见,针对消息设置生存时间更加灵活,我也是选用的这种方式。

2.Dead Letter Exchanges(DLX)

RabbitMQ可以在队列里配置死信交换机然后通过路由键绑定到另一个队列上,如果该队列里面的消息dead letter,就会通过交换机发往另一个队列里面,然后就可以消费了。

队列里面出现dead letter的情况有以下几种:

  • 消息或队列TTL过期
  • 队列长度达到最大
  • 消息被消费者拒绝消费

3.具体实现

实现消息延迟发送的具体思路是:首先创建一个交换机来当做死信交换机,再创建一个队列与这个死信交换机进行绑定就称作死信队列。其次创建一个交换机来当做正常交换机,在创建一个队列与这个正常交换机进行绑定,同时将死信交换机和死信路由键配置到这个正常队列里面。这样,当一条带有存活时间的消息通过正常交换机发送过来时,首先进入正常队列里面,然后到了存活时间,就会通过死信交换机根据路由键发送到死信队列里面,然后消费者消费死信队列里的消息,就达到了延迟消费的目的。

java代码实现。
1.首先创建maven项目,导入pom文件

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>

2.配置配置文件(如果是yml,就用对应的书写规则)

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3.配置mq的相关组件

package com.wps.cn.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author wangps* @date 2022年11月22日 14:44*/@Configuration
public class QueueConfig {public static final String NORMAL_QUEUE_NAME = "normal_queue_name";public static final String NORMAL_EXCHANGE_NAME = "normal_exchange_name";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";public static final String DLX_QUEUE_NAME = "dlx_queue_name";public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";public static final String DLX_ROUTING_KEY = "dlx_routing_key";/*** 死信队列* @return*/@BeanQueue dlxQueue() {return new Queue(DLX_QUEUE_NAME, true);}/*** 死信交换机* @return*/@BeanDirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE_NAME);}/*** 绑定死信队列和死信交换机* @return*/@BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/*** 普通消息队列* @return*/@BeanQueue normalQueue() {Map<String, Object> args = new HashMap<>();//设置消息过期时间,此方法是在队列的颗粒度设置,比较局限,所以在消息上设置过期时间
//        args.put("x-message-ttl", 1000*5);//设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);//设置死信 routing_keyargs.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(NORMAL_QUEUE_NAME, true, false, false, args);}/*** 普通交换机* @return*/@BeanDirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE_NAME);}/*** 绑定普通队列和与之对应的交换机* @return*/@BeanBinding nomalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);}
}

4.创建消费者

package com.wps.cn.consumer;import com.rabbitmq.client.Channel;
import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.util.Map;/*** @author wangps* @date 2022年11月22日 14:55*/
@Component
public class DlxConsumer {private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)public void process(String order, Message message, @Headers Map<String, Object> headers, Channel channel) {logger.info("订单号消息",  order);System.out.println("执行结束...."+message);}
}

5.创建controller当做生产者

package com.wps.cn.controller;import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/*** @author wangps* @date 2022年11月22日 15:50*/
@RestController
@RequestMapping("/producer")
public class TestProducer {private static final Logger logger =  LoggerFactory.getLogger(TestProducer.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/sendMessage")public Object submit(){String orderId = UUID.randomUUID().toString();logger.info("提交订单消息========",orderId);rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE_NAME,QueueConfig.NORMAL_ROUTING_KEY,orderId,message -> {message.getMessageProperties().setExpiration(1000*5+"");return message;});return "{'orderId':'"+orderId+"'}";}}

6.创建启动类

package com.wps.cn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DxlRabbitmqTestApplication {public static void main(String[] args) {SpringApplication.run(DxlRabbitmqTestApplication.class, args);}}

7.运行启动类,然后在页面访问,模拟推送消息
http://localhost:8080/producer/sendMessage
观察日志可以看出,消息发出后,在5s后消费者收到消息,从而达到延迟消费的情况。


总结

以上就是今天所讲的RabbitMQ实现延迟消费,本文主要使用了RabbitMQ的TTL和DLX来实现的。虽然说也是看了网上前辈们的总结,但是也有自己的实操经验,也是为了记录下来能够自己加深印象。
如果有不对的地方还请赐教,希望此篇文章能够对你有所帮助。

RabbitMQ实现消息的延迟推送或延迟发送相关推荐

  1. RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  2. RabbitMQ 延迟队列,消息延迟推送

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  3. rabbitmq 消息延迟推送--插件模式

    到此 灵熙云工作室 - 实践出真理 查看全文内容 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在签收商品后,物流系统会在七天后延时发送一个消息给 ...

  4. xmpp关于后台挂起的消息接收,后台消息推送,本地发送通知

    想问下,在xmpp即时通讯的项目中,我程序如果挂起了,后台有消息过来,我这边的推送不过来,所以我的通知就会收不到消息,当我重新唤醒应用的时候,他才会接收到通知,消息就会推送过来,我在plist哪里设置 ...

  5. Python Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样实现最方便呢?我这里推荐大家使用GoEasy,它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 支持we ...

  6. C# Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样实现最方便呢?我这里推荐大家使用GoEasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 支持w ...

  7. ASP.NET Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样用ASP.NET实现最方便呢?我这里推荐大家使用GoEasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEa ...

  8. C# Web实时消息后台服务器推送技术-GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样用C#实现最方便呢?我这里推荐大家使用GoEasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 ...

  9. Ruby Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样实现最方便呢?我这里推荐大家使用GoEasy,它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 支持we ...

最新文章

  1. HarmonyOS ScrollView 不滑动的问题
  2. HDU 4857 Couple doubi(找循环节)
  3. 简单说说你对Java内存模型的原子性的了解吧?
  4. matlab crf 工具包,python调用CRF++工具包
  5. animate css3 应用的借鉴,一个同事写的JS
  6. 数据结构实验之栈三:后缀式求值
  7. Vivado FIFO IP核接口信号介绍
  8. 学习笔记大型《构建高性能web站点》
  9. python把字符串转成字典
  10. java byte 图片浏览器直接显示_在imge控件中直接显示图片(图片是byte[]格式)
  11. AJAX, JSON.js,Newtonsoft.Json.dll,nunit.framework.dll 源代码
  12. vue 描述框[el-descriptions] 与之对应 div
  13. 工厂模式以及应用简单解释
  14. 微信登录收不到回调的解决方法
  15. springboot 解决java.lang.ArrayStoreException
  16. html+input+js双击,JS双击变input框批量修改内容
  17. 关于bit-banding的解释和相关作用(转载)
  18. 一文掌握常见常用Java集合框架
  19. 数据分析进阶必看干货!销售额下滑详细分析案例
  20. 移动软件开发-设计app首页

热门文章

  1. viper4android 3D,中国设计之窗--The Viper 3D播放器界面设计图文综合
  2. c语言调用system32下的dll文件,复制一个dll文件进system32,说:需要权限什么之类的...
  3. Skype 的在线状态 代码
  4. Python一键下载视频脚本分享
  5. fullpage的基本使用方法
  6. 科学计算机怎么刷机,华为p1怎么刷机【图文教程】
  7. php7.2.10+mysql+Ptcms源码调试
  8. PMP备考之路 - PMBOK第一章(引论)
  9. 毕业设计一深度学习之MRI数据集预处理(合并,裁剪以及重命名等操作)
  10. 程序员被问:你的期望薪资是多少?这样回答再涨5K!