SpringBoot整合Redisson实现延迟队列

  • 技术选型
  • 引入 Redisson 依赖
  • 配置项
  • 编写工具类
  • 延迟队列执行器
  • 业务消费类枚举
  • 加载消费队列
  • 消费者类
  • 测试类
  • 测试结果

技术选型

关于延迟队列的概念还是其他技术选择请参考这个文章点我。由于系统中使用了Redisson我这里就用他实现一下。
说明:当时参考的不知道是哪位大佬的文章,没有保存住,在这里略表歉意。

好了开撸

引入 Redisson 依赖

<!--redisson--><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.0</version></dependency>

配置项

# 单节点配置
singleServerConfig:# 连接空闲超时,单位:毫秒idleConnectionTimeout: 10000# 连接超时,单位:毫秒connectTimeout: 10000# 命令等待超时,单位:毫秒timeout: 3000# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。retryAttempts: 3# 命令重试发送时间间隔,单位:毫秒retryInterval: 1500#  # 重新连接时间间隔,单位:毫秒#  reconnectionTimeout: 3000#  # 执行失败最大次数#  failedAttempts: 3# 密码password: ********# 单个连接最大订阅数量subscriptionsPerConnection: 5# 客户端名称clientName: null#  # 节点地址address: redis://127.0.0.1:6379# 发布和订阅连接的最小空闲连接数subscriptionConnectionMinimumIdleSize: 1# 发布和订阅连接池大小subscriptionConnectionPoolSize: 50# 最小空闲连接数connectionMinimumIdleSize: 32# 连接池大小connectionPoolSize: 64# 数据库编号database: 0# DNS监测时间间隔,单位:毫秒dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"

文件位置

加载数据

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;import java.io.IOException;/*** @author ***** @date 2021/7/14*/
@Configuration
public class RedissonConfig {@Bean(destroyMethod = "shutdown")public RedissonClient redissonClient() throws IOException {Config config = Config.fromYAML(new ClassPathResource("redisson/redisson-single.yml").getInputStream());return Redisson.create(config);}
}

编写工具类

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @author ****** @date 2021/7/14* redis延迟队列工具*/
@Slf4j
@Component
public class RedisDelayQueueUtil {@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param value 队列值* @param delay 延迟时间* @param timeUnit 时间单位* @param queueCode 队列键* @param <T>*/public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, timeUnit);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.error("(添加延时队列失败) {}", e.getMessage());throw new RuntimeException("(添加延时队列失败)");}}/*** 获取延迟队列* @param queueCode* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {//Blocking Deque (阻塞双端队列)  没有消息时,会阻塞住当前线程,直到有新的消息到来RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);T value  = (T) blockingDeque.take();return value;}}

关于Redisson的几种分布式队列可以参考这片文章点我

延迟队列执行器

/*** @author **** @date 2021/7/14* 延迟队列执行器  消费类需要实现此接口*/
public interface RedisDelayQueueHandle<T> {/*** 执行方法* @param t 执行类*/void execute(T t);
}

业务消费类枚举

主要将你实际的消费类放到枚举中,系统启动的时候遍历枚举类将你的消费类加载到队列中

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;/*** @author ******** @date 2021/7/14*/
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum  RedisDelayQueueEnum {CONSUME_ORDER_PAY_TIMEOUT("CONSUME_ORDER_PAY_TIMEOUT","订单支付超时,自动取消订单", "consumeOrderPayTimeout"),CONSUME_JOINT_TIMOUT("CONSUME_JOINT_TIMOUT", "拼团超时,取消拼团", "consumeJointTimout");/*** 延迟队列 Redis Key*/private String code;/*** 中文描述*/private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;
}

加载消费队列

注意: 这里有一点要注意一下。因为redisson在获取延迟队列时是调用getBlockingDeque,而Blocking Deque是阻塞双端队列,当该队列没有消息时会阻塞住当前线程,直到另一个线程将一个元素插入空队列,或者从完整队列中轮询第一个元素才会继续下步操作。为了防止各个消费队列相互阻塞影响,就每个消费类(消费队列)都单起一个线程去获取数据,这样大家各干各的,互不影响。

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;/****@author ****@date  2021/7/14*系统启动时加载消费队列*/
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {@Autowiredprivate RedisDelayQueueUtil redisDelayQueueUtil;@Overridepublic void run(String... args) {RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();//每个消费者启动一个固定线程,以防止某个消费者在没有消息消费时调用getBlockingDeque(阻塞双端队列)一直被阻塞进而导致其他的消费者有消息消费也被阻塞住.for (RedisDelayQueueEnum queueEnum : queueEnums) {new Thread(() -> {while (true){try {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId());redisDelayQueueHandle.execute(value);}System.out.println();}catch (Exception e) {log.error("(Redis延迟队列异常中断) {}", e);}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}log.info("(Redis延迟队列启动成功)");}
}

消费者类

import com.civ.edu.service.order.ApiOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Map;/*** @author ***** @date 2021/7/14* 订单支付超时处理类*/
@Component
@Slf4j
public class ConsumeOrderPayTimeout implements RedisDelayQueueHandle<Map> {@AutowiredApiOrderService apiOrderService;@Overridepublic void execute(Map map) {log.info("(收到订单支付超时延迟消息) {}", map);//你的业务逻辑代码}
}

测试类

    @AutowiredRedisDelayQueueUtil redisDelayQueueUtil;@GetMapping("orderId")public Result orderId(){Map<String, Object> param = new HashMap<>(2);param .put("orderId", "1415626985311772674");param .put("remark", "订单支付超时,自动取消订单");// 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.CONSUME_ORDER_PAY_TIMEOUT.getCode());return Result.success();}

测试结果

2021-07-16 00:04:15.614 [http-nio-8081-exec-3] INFO  RedisDelayQueueUtil-(添加延时队列成功) 队列键:CONSUME_ORDER_PAY_TIMEOUT,队列值:{orderId=1415626985311772674, remark=订单支付超时,自动取消订单},延迟时间:10秒
2021-07-16 00:04:25.665 [Thread-36] INFO  ConsumeOrderPayTimeout-(收到订单支付超时延迟消息) {orderId=1415626985311772674, remark=订单支付超时,自动取消订单}

SpringBoot整合Redisson实现延迟队列相关推荐

  1. springboot整合redisson实现多种分布式锁

    Redisson概述 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid).它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式 ...

  2. springboot整合redisson(一)搭建Redisson环境

    一.创建springboot项目 1.通过idea创建springboot项目 2.通过web网站创建springboot项目 创建完之后的项目结构如下: 二.引入redisson依赖 由于我们是sp ...

  3. SpringBoot 整合:Redis延时队列的简单实现(基于有赞的设计)

    点击关注公众号,Java干货及时送达 来源:blog.csdn.net/qq330983778/article/details/99341671 设计 之前学习Redis的时候发现有赞团队之前分享过一 ...

  4. SpringBoot RabbitMQ 集成 七 延迟队列

    为什么80%的码农都做不了架构师?>>>    何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟 ...

  5. springboot整合redisson实现分布式锁

    一.介绍Redisson Redisson是Redis官方推荐的Java版的Redis客户端(Jedis.letture也是官方推荐的java版本redis客户端程序).它提供的功能非常多,也非常强大 ...

  6. springboot整合redisson实战(二)Redisson分布式锁的使用

    redisson锁说明 Redisson是基于Netty实现的,是更高性能的第三方库.实现了可重入锁(Reentrant Lock).公平锁(Fair Lock.联锁(MultiLock). 红锁(R ...

  7. Springboot 整合 Redisson

    前言 redisson的整合很简单,这里直接上代码和演示"读写锁"测试,其他更多的"并发锁",详情见官方文档(地址见文章末尾),官方写得很详细,这里不赘述了. ...

  8. SpringBoot整合redisson分布式锁

    1.为什么要使用分布式锁 在分布式场景下为了保证数据最终一致性.在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(lock-synchronized), ...

  9. SpringBoot整合Redisson

    Redisson官方文档: https://github.com/redisson/redisson/wiki 简介:Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(I ...

最新文章

  1. picACG本地缓存目录_7天用Go动手写/从零实现分布式缓存GeeCache
  2. python|selenium的API说明文档查看的2种方法
  3. 慢动作输出 Linux 命令结果并用彩色显示
  4. rxjs switchMap的实现原理
  5. 人造流星这种生日礼物,你有过吗?现在国外有了
  6. MySQL8的inodb参数设置_MySQL8.0自适应参数innodb_dedicated_server
  7. Spring Data JPA事务管理
  8. 查看linux 系统 当前使用的网卡
  9. mysql常用的一些命令,用于查看数据库、表、字段编码
  10. 黑苹果固态硬盘_普通电脑也能尝试:MacOS黑苹果+雷电3硬盘盒测试
  11. oracle怎么查询临时表空间大小,如何查看oracle临时表空间当前使用了多少空间的大小...
  12. 人工智能--框架表示法
  13. 概率论笔记3.1二维随机变量及其函数分布
  14. iOS 之电影播放器
  15. Android 安装的app
  16. 专利大战中 苹果被迫曝光的8个秘密
  17. 社会管理网格化 源码_天津市红桥区社会治理网格化管理平台获CIO智选政务应用奖...
  18. 【论文笔记】DUDA‑Net: a double U‑shaped dilated attention network for automatic infection area segmentati
  19. java aot,Java三种编译方式: 前端编译 JIT编译 AOT编译
  20. AD pcb中无法选中铜层

热门文章

  1. 怎么把英语音频转换成文字?这些方法你应该要试一下
  2. corejs和presets env和presets es2015和presets stage-3之间的关系
  3. 物联卡中心:电信物联卡稳定吗,电信物联卡资费标准
  4. 小米11pro和小米11ultra哪个好
  5. 基于Sring+bootstrap+MySQL的住房公积金管理系统
  6. 黑马程序员之Web前端全栈 · 阶段一 前端开发基础 (3)
  7. java毕业设计动物在线领养网站Mybatis+系统+数据库+调试部署
  8. 爱奇艺谢丹铭:用AI让创作者提升效率,让消费者简单快乐
  9. 评分addmodule 绝对值评分 8种方法可视化你的单细胞基因集打分gsea 缺氧评分
  10. 第三十三周学习生活总结