Redis学习笔记之十:Redis用作消息队列
Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:
存放消息端(消息生产者):
package org.yamikaze.redis.messsage.queue;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;import java.util.concurrent.TimeUnit;/*** 消息生产者* @author yamikaze*/
public class Producer extends Thread {public static final String MESSAGE_KEY = "message:queue";private Jedis jedis;private String producerName;private volatile int count;public Producer(String name) {this.producerName = name;init();}private void init() {jedis = MyJedisFactory.getLocalJedis();}public void putMessage(String message) {Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(producerName + ": 当前未被处理消息条数为:" + size);count++;}public int getCount() {return count;}@Overridepublic void run() {try {while (true) {putMessage(StringUtils.generate32Str());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException{Producer producer = new Producer("myProducer");producer.start();for(; ;) {System.out.println("main : 已存储消息条数:" + producer.getCount());TimeUnit.SECONDS.sleep(10);}}
}
消息处理端(消息消费者):
package org.yamikaze.redis.messsage.queue;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;/*** 消息消费者* @author yamikaze*/
public class Customer extends Thread{private String customerName;private volatile int count;private Jedis jedis;public Customer(String name) {this.customerName = name;init();}private void init() {jedis = MyJedisFactory.getLocalJedis();}public void processMessage() {String message = jedis.rpop(Producer.MESSAGE_KEY);if(message != null) {count++;handle(message);}}public void handle(String message) {System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");}@Overridepublic void run() {while (true) {processMessage();}}public static void main(String[] args) {Customer customer = new Customer("yamikaze");customer.start();}
}
但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:
1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。
所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:
public void processMessage() {/*** brpop支持多个列表(队列)* brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。* 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY* 0表示不限制等待,会一直阻塞在这儿*/List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");if(messages.size() != 0) {//由于该指令可以监听多个Key,所以返回的是一个列表//列表由2项组成,1) 列表名,2)数据String keyName = messages.get(0);//如果返回的是MESSAGE_KEY的消息if(Producer.MESSAGE_KEY.equals(keyName)) {String message = messages.get(1);handle(message);}}System.out.println("=======================");
}
然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。
发布/订阅模式
![](/assets/blank.gif)
![](/assets/blank.gif)
![](/assets/blank.gif)
![](/assets/blank.gif)
![](/assets/blank.gif)
package org.yamikaze.redis.messsage.subscribe;import org.yamikaze.redis.messsage.queue.StringUtils;
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;/*** 消息发布方* @author yamikaze*/
public class Publisher {public static final String CHANNEL_KEY = "channel:message";private Jedis jedis;public Publisher() {jedis = MyJedisFactory.getLocalJedis();}public void publishMessage(String message) {if(StringUtils.isBlank(message)) {return;}jedis.publish(CHANNEL_KEY, message);}public static void main(String[] args) {Publisher publisher = new Publisher();publisher.publishMessage("Hello Redis!");}
}
简单的发送一个消息。
package org.yamikaze.redis.messsage.subscribe;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;import java.util.concurrent.TimeUnit;/*** 消息订阅方客户端* @author yamikaze*/
public class SubscribeClient {private Jedis jedis;private static final String EXIT_COMMAND = "exit";public SubscribeClient() {jedis = MyJedisFactory.getLocalJedis();}public void subscribe(String ...channel) {if(channel == null || channel.length <= 0) {return;}//消息处理,接收到消息时如何处理JedisPubSub jps = new JedisPubSub() {/*** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]*/@Overridepublic void onMessage(String channel, String message) {if(Publisher.CHANNEL_KEY.equals(channel)) {System.out.println("接收到消息: channel : " + message);//接收到exit消息后退出if(EXIT_COMMAND.equals(message)) {System.exit(0);}}}/*** 订阅时*/@Overridepublic void onSubscribe(String channel, int subscribedChannels) {if(Publisher.CHANNEL_KEY.equals(channel)) {System.out.println("订阅了频道:" + channel);}}};//可以订阅多个频道 当前线程会阻塞在这儿jedis.subscribe(jps, channel);}public static void main(String[] args) {SubscribeClient client = new SubscribeClient();client.subscribe(Publisher.CHANNEL_KEY);//并没有 unsubscribe方法//相应的也没有punsubscribe方法}
}
![](/assets/blank.gif)
总结:
Redis学习笔记之十:Redis用作消息队列相关推荐
- redis学习笔记(7)之redis哨兵详解
redis哨兵详解 sentinel命令 客户端连接 素材代码 思路 实现过程 哨兵的切换实现原理 发布订阅基础 哨兵的实现原理 部署建议 需要关注的问题 代码流程 内容来源为六星教育,这里仅作为学习 ...
- redis学习笔记(2)之redis主从详解
redis主从详解 主从详解 主从配置 拓扑 原理 数据同步 概念 复制偏移量 复制积压缓冲区 主节点运行ID Psync命令 全量复制流程 部分复制流程 心跳 缓冲大小调节 读写分离 内容来源为六星 ...
- redis学习笔记(6)之redis哨兵
redis哨兵 redis哨兵初识 基础概念 主从复制的问题 redis 哨兵的高可用性 redis哨兵安装和部署 部署结构 内容来源为六星教育,这里仅作为学习笔记 redis哨兵初识 redis的主 ...
- redis学习笔记(5)之redis内存优化
redis内存优化 配置优化 Linux 配置优化 Redis配置优化 缩减键值对象 命令处理 缓存淘汰优化 动态改配置命令 设置最大内存 设置淘汰策略 内存淘汰策略 如何选择淘汰策略 内容来源为六星 ...
- 【黑马程序员】Redis学习笔记001:Redis简介+五种基本数据类型
一.Redis入门简介及基本操作命令 问题的抛出 出现的问题: 海量用户 高并发 罪魁祸首--关系型数据库: 性能瓶颈:磁盘IO性能低下 扩展瓶颈:数据关系复杂,扩展性差,不便于大规模集群 解决思路 ...
- 【Redis学习笔记】09.Redis 数据持久化
Redis 数据持久化 1. 持久化概述 2. RDB 持久化 2.1. RDB 持久化优点 2.2. RDB 持久化缺点 2.3. RDB 持久化原理 2.4. RDB 触发方式 2.5. save ...
- 【Redis学习笔记】13.Redis 主从复制
Redis 主从复制 1. Redis 主从复制特性 2. Redis 主从复制流程 3. Redis 主从复制操作 3.1. 快速部署Redis实例 3.2. 配置主从复制 3.3. 查看主从复制 ...
- 学习笔记 c++ (简单的消息队列)
后面第二个版本是循环发布的 发送端cpp代码 #include <iostream> #include <sys/types.h> #include <sys/ipc.h ...
- Redis学习笔记(十八) 集群(下)
复制和故障转移 Redis集群中的节点分为主节点(master)和从节点(slave),其中主节点用于处理槽,而从节点则用于复制某个主节点,并在被复制 的主节点下线时,代替下线主节点继续处理命令请求. ...
最新文章
- TF版本升级问题:成功解决AttributeError: module tensorflow has no attribute mul
- 科大星云诗社动态20210328
- 天平应什么放置_天平是否应该放干燥剂?
- JDBC登录功能实现
- 打破情感分类准确率 80 分天花板!更加充分的知识图谱结合范式
- rolling方式修改oplog
- wmware 安装xp系统虚拟机
- 基于Springboot实现高校社团管理系统
- 新人Unity下载安装
- 远程桌面管理助手有哪些?11款最好的远程桌面软件推荐。
- wps压缩word文档方法
- 解析云产品SLA的价值
- 一文读懂《理解未来的7个原则》
- javascript渐变色算法
- Mysql数据库操作语句总结(一)
- shiro salt
- python大数据需要什么技术有前途_大数据就业前景好不好 一般要掌握哪些技术...
- linux系统编程3—文件存储函数
- c++虚函数详解(你肯定懂了)
- 手机端省市区三级联动
热门文章
- 微信小程序python自动化测试_微信小程序UI自动化测试实践:Minium+PageObject
- 微信3.1.0.58逆向-微信3.1.0.58HOOK接口(WeChatHelper3.1.0.58.dll)使用说明-获取群成员
- 使用python爬取付费音乐
- OM6621PW蓝牙智能指纹锁(附芯片选型)
- 有货移动Web端性能优化探索实践
- 转摘-自阿里技术大牛的成长总结
- 前端性能优化方法(一)
- 新浪微博 Redis 实战经验分享
- 摩托罗拉g7 plus,手机浮躁时代的匠心臻品
- 神经网络模型的训练过程,人脸识别神经网络模型