springboot:整合redis之消息队列
文章目录
- springboot:整合redis之消息队列
- 一、项目准备
- 二、配置类
- 三、redis中list数据类型
- 定时器监听队列
- 运行即监控队列
- 四、发布/订阅模式
- RedisConfig中添加监听器
- 订阅者
- 消息发布
- 另一种发布方式
- 消息是实体对象,进行转换
- 五、ZSet实现延迟队列
springboot:整合redis之消息队列
一、项目准备
依赖
<!-- RedisTemplate --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- Redis-Jedis --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>
application.yaml配置文件
spring:redis:host: 127.0.0.1port: 6379database: 0timeout: 4000jedis:pool:max-wait: -1max-active: -1max-idle: 20min-idle: 10
二、配置类
public class ObjectMapperConfig {public static final ObjectMapper objectMapper;private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";static {JavaTimeModule javaTimeModule = new JavaTimeModule();javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());objectMapper = new ObjectMapper()// 转换为格式化的json(控制台打印时,自动格式化规范)//.enable(SerializationFeature.INDENT_OUTPUT)// Include.ALWAYS 是序列化对像所有属性(默认)// Include.NON_NULL 只有不为null的字段才被序列化,属性为NULL 不序列化// Include.NON_EMPTY 如果为null或者 空字符串和空集合都不会被序列化// Include.NON_DEFAULT 属性为默认值不序列化.setSerializationInclusion(JsonInclude.Include.NON_NULL)// 如果是空对象的时候,不抛异常.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)// 反序列化的时候如果多了其他属性,不抛出异常.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)// 取消时间的转化格式,默认是时间戳,可以取消,同时需要设置要表现的时间格式.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).setDateFormat(new SimpleDateFormat(PATTERN))// 对LocalDateTime序列化跟反序列化.registerModule(javaTimeModule).setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)// 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);}static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {@Overridepublic void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN)));}}static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {@Overridepublic LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException {return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN));}}}
@Configuration
public class RedisConfig {/*** redisTemplate配置*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();// 配置连接工厂template.setConnectionFactory(factory);//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// 使用StringRedisSerializer来序列化和反序列化redis的key,value采用json序列化template.setKeySerializer(stringRedisSerializer);template.setValueSerializer(jacksonSerializer);// 设置hash key 和value序列化模式template.setHashKeySerializer(stringRedisSerializer);template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}
三、redis中list数据类型
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部和尾部添加新的元素
优势:
- 顺序排序,保证先进先出
- 队列为空时,自动从Redis数据库删除
- 在队列的两头插入或删除元素,效率极高,即使队列中元素达到百万级
- List中可以包含的最大元素数量是4294967295
定时器监听队列
生产者
@Slf4j
@Component
public class MessageProducer {public static final String MESSAGE_KEY = "message:queue";@Autowiredprivate RedisTemplate<String,Object> redisTemplate;public void lPush() {for (int i = 0; i < 10; i++) {new Thread(() -> {Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");log.info(Thread.currentThread().getName() + ":put message size = " + size);}).start();}}
}
消费者:消费消息,定时器以达到监听队列功能
@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {public static final String MESSAGE_KEY = "message:queue";@Autowiredprivate RedisTemplate<String,Object> redisTemplate;@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)public void rPop() {String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);log.info(message);}
}
@RestController
public class RedisController {@Autowiredprivate MessageProducer messageProducer;@GetMapping("/lPush")public void lPush() {messageProducer.lPush();}
}
测试
http://localhost:8080/lPush
可能出现的问题:
1.通过定时器监听List中是否有待处理消息,每执行一次都会发起一次连接,这会造成不必要的浪费。
2.生产速度大于消费速度,队列堆积,消息时效性差,占用内存。
运行即监控队列
修改消息消费者代码。
当队列没有元素时,会阻塞10秒,然后再次监听队列,
需要注意的是,阻塞时间必须小于连接超时时间
@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {public static final String MESSAGE_KEY = "message:queue";@Autowiredprivate RedisTemplate<String,Object> redisTemplate;//@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)public void rPop() {String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);log.info(message);}@PostConstructpublic void brPop() {new Thread(() -> {while (true) {String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);log.info(message);}}).start();}
}
阻塞时间不能为负,直接报错超时为负
阻塞时间为零,此时阻塞时间等于超时时间,最后报错连接超时
阻塞时间大于超时时间,报错连接超时
测试:
消息不可重复消费,因为消息从队列POP之后就被移除了,即不支持多个消费者消费同一批数据
消息丢失,消费期间发生异常,消息未能正常消费
四、发布/订阅模式
消息可以重复消费,多个消费者订阅同一频道即可
一个消费者根据匹配规则订阅多个频道
消费者只能消费订阅之后发布的消息,这意味着,消费者下线再上线这期间发布的消息将会丢失
数据不具有持久化。同样Redis宕机也会数据丢失
消息发布后,是推送到一个缓冲区(内存),消费者从缓冲区拉取消息,当消息堆积,缓冲区溢出,消费者就会被迫下线,同时释放对应的缓冲区
RedisConfig中添加监听器
/*** redis消息监听器容器*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅频道,通配符*表示任意多个占位符container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));return container;}
订阅者
package com.yzm.redis08.message;import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;public class MySubscribe implements MessageListener {@Overridepublic void onMessage(Message message, byte[] bytes) {System.out.println("订阅频道:" + new String(message.getChannel()));System.out.println("接收数据:" + new String(message.getBody()));}
}
消息发布
@GetMapping("/publish")public void publish() {redisTemplate.convertAndSend("channel_first", "hello world");}
另一种发布方式
/*** redis消息监听器容器*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅频道,通配符*表示任意多个占位符container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));// 通配符?:表示一个占位符MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");listenerAdapter.afterPropertiesSet();container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));return container;}
public class MySubscribe2 {public void getMessage(Object message, String channel) {System.out.println("订阅频道2:" + channel);System.out.println("接收数据2:" + message);}
}
@GetMapping("/publish2")public void publish2() {redisTemplate.convertAndSend("channel2", "hello world");}
消息是实体对象,进行转换
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {private static final long serialVersionUID = 5250232737975907491L;private Integer id;private String username;
}
public class MySubscribe3 implements MessageListener {@Overridepublic void onMessage(Message message, byte[] bytes) {Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);User user = jacksonSerializer.deserialize(message.getBody());System.out.println("订阅频道3:" + new String(message.getChannel()));System.out.println("接收数据3:" + user);}
}
/*** redis消息监听器容器*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//订阅频道,通配符*:表示任意多个占位符container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));// 通配符?:表示一个占位符MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");listenerAdapter.afterPropertiesSet();container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));return container;}
@GetMapping("/publish3")public void publish3() {User user = User.builder().id(1).username("yzm").build();redisTemplate.convertAndSend("user", user);}
五、ZSet实现延迟队列
生产消息,score = 时间搓+60s随机数
public static final String MESSAGE_ZKEY = "message:ZSetqueue";public volatile AtomicInteger count = new AtomicInteger();public void zAdd() {for (int i = 0; i < 10; i++) {new Thread(() -> {int increment = count.getAndIncrement();log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);}).start();}}
消费者:定时任务,每秒执行一次
public static final String MESSAGE_ZKEY = "message:ZSetqueue";public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();@Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)public void zrangebysocre() {log.info("延时队列消费。。。");// 拉取score小于当前时间戳的消息Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());if (messages != null) {for (Object message : messages) {Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);log.info("消费了:" + message + "消费时间为:" + simpleDateFormat.format(score));redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);}}}
@GetMapping("/zadd")public void zadd() {messageProducer.zAdd();}
springboot:整合redis之消息队列相关推荐
- springboot整合redis实现消息队列
在java中直接使用redis的时候,直接使用简单的两个指令lpush和rpop或者rpush和lpop就可以实现消息队列的操作.当与spring结合时,可以使用RedisTemplate和Strin ...
- SpringBoot 整合 Redis 实现消息队列
写这篇文章的原因还是得归咎于
- SpringBoot第十四篇:在springboot中用redis实现消息队列
这篇文章主要讲述如何在springboot中用reids实现消息队列. 准备阶段 安装redis,可参考我的另一篇文章,5分钟带你入门Redis. java 1.8 maven 3.0 idea 环境 ...
- springboot使用redis实现消息队列功能,redis使用list和stream实现消息队列功能,redis实现消息队列的风险点分析
文章目录 写在前面 基于list的消息队列解决方案 使用list基本实现消息队列 阻塞式消费,避免性能损失 替换while(true) 实现消息幂等 保证消息可靠性 基于stream的消息队列解决方案 ...
- springboot利用redis作为消息队列mq使用
先吐个槽:经常看到有人对不同得消息队列做各种各样得评价以及所谓得性能测试,评估选型等等,岂不知脱离任何业务得技术评估都是瞎扯淡.(公司实际业务技术场景不提,满口胡说kafa怎么样,activemq怎么 ...
- SpringBoot项目redis的消息队列
第一步导入redis依赖即可 第二步修改配置文件(这里我用的是mvn项目) spring.application.name=springbootdemospring.rabbitmq.host=192 ...
- 企业级 SpringBoot 教程 (十四)在springboot中用redis实现消息队列
准备阶段 安装redis,可参考我的另一篇文章,5分钟带你入门Redis. java 1.8 maven 3.0 idea 环境依赖 创建一个新的springboot工程,在其pom文件,加入spri ...
- springboot整合redis消息队列
前言 消息队列作为一种常用的异步通信解决方案,而redis是一款高性能的nosql产品,今天就给大家介绍一下,如何使用redis实现消息队列,并整合到springboot. 两个消息模型 1. 队列模 ...
- 【springboot】【redis】springboot+redis实现发布订阅功能,实现redis的消息队列的功能...
springboot+redis实现发布订阅功能,实现redis的消息队列的功能 参考:https://www.cnblogs.com/cx987514451/p/9529611.html 思考一个问 ...
最新文章
- mysql未知数据库_如何处理这个错误(1049,“未知数据库”/ users / ohyunjun / work / astral / mysql“”)...
- JS实现css属性动画效果
- 如何初始化局部变量c语言_重要的事情说三遍:局部变量一定要初始化!你做到了吗?...
- PostgreSQL代码分析,查询优化部分,canonicalize_qual
- 有向图的强连通分量,割点与桥
- 2012总结--第5篇--人脉篇
- 模型相关:SolidWorks按实际比例创建模型,在Unity中保持尺寸不变
- Hive导入和导出数据
- 施耐德 m340 编程手册_工控安全研究系列(一)施耐德PLC 140CPU 65150
- 0-12V的模拟量隔离模块的一些感悟
- 新版itunes添加铃声
- 工信部教育与考试中心-软件测试工程师考试题A卷-答
- 给比特币“雕花” 增值还是累赘?
- 利用ArcGIS创建要素与表之间的关系类并发布带有关系数据表的要素服务
- 新加坡内推 | Motional新加坡分部招聘自动驾驶感知算法实习生
- 算法之美——循环移位(XY→YX)逆操作
- 链路聚合(eth-trunk)
- Linux.配置Hadoop环境的一些问题解决
- 行人重识别简介(Person ReID)
- 四、微信小程序之简单计算器(学习记录)