Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:

由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单示例如下:

存放消息端(消息生产者):

package org.yamikaze.redis.messsage.queue;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;import java.util.concurrent.TimeUnit;/*** 消息生产者* @author yamikaze*/
public class Producer extends Thread {public static final String MESSAGE_KEY = "message:queue";private Jedis jedis;private String producerName;private volatile int count;public Producer(String name) {this.producerName = name;init();}private void init() {jedis = MyJedisFactory.getLocalJedis();}public void putMessage(String message) {Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(producerName + ": 当前未被处理消息条数为:" + size);count++;}public int getCount() {return count;}@Overridepublic void run() {try {while (true) {putMessage(StringUtils.generate32Str());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException{Producer producer = new Producer("myProducer");producer.start();for(; ;) {System.out.println("main : 已存储消息条数:" + producer.getCount());TimeUnit.SECONDS.sleep(10);}}
}

消息处理端(消息消费者):

package org.yamikaze.redis.messsage.queue;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;/*** 消息消费者* @author yamikaze*/
public class Customer extends Thread{private String customerName;private volatile int count;private Jedis jedis;public Customer(String name) {this.customerName = name;init();}private void init() {jedis = MyJedisFactory.getLocalJedis();}public void processMessage() {String message = jedis.rpop(Producer.MESSAGE_KEY);if(message != null) {count++;handle(message);}}public void handle(String message) {System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");}@Overridepublic void run() {while (true) {processMessage();}}public static void main(String[] args) {Customer customer = new Customer("yamikaze");customer.start();}
}

但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:

1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。

2)、如果睡眠时间过长,这样不能处理一些时效性的消息,睡眠时间过短,也会在连接上造成比较大的开销。

所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:

public void processMessage() {/*** brpop支持多个列表(队列)* brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。* 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY* 0表示不限制等待,会一直阻塞在这儿*/List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");if(messages.size() != 0) {//由于该指令可以监听多个Key,所以返回的是一个列表//列表由2项组成,1) 列表名,2)数据String keyName = messages.get(0);//如果返回的是MESSAGE_KEY的消息if(Producer.MESSAGE_KEY.equals(keyName)) {String message = messages.get(1);handle(message);}}System.out.println("=======================");
}

然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。

发布/订阅模式

    Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。
    
    1)发布
    PUBLISH指令可用于发布一条消息,格式 PUBLISH channel message
    返回值表示订阅了该消息的数量。
    2)订阅
    SUBSCRIBE指令用于接收一条消息,格式 SUBSCRIBE channel
    可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
    1、如果为subscribe,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?) 
    2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
    3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
    可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
   
    Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:
   再试试推送消息会得到以下结果:
   可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为PSUBSCRIBE指令可以重复订阅频道。而使用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。同时PUNSUBSCRIBE指令通配符不会展开。
例如:PUNSUBSCRIBE * 不会匹配到 channel.*, 所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。
代码示范如下:
package org.yamikaze.redis.messsage.subscribe;import org.yamikaze.redis.messsage.queue.StringUtils;
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;/*** 消息发布方* @author yamikaze*/
public class Publisher {public static final String CHANNEL_KEY = "channel:message";private Jedis jedis;public Publisher() {jedis = MyJedisFactory.getLocalJedis();}public void publishMessage(String message) {if(StringUtils.isBlank(message)) {return;}jedis.publish(CHANNEL_KEY, message);}public static void main(String[] args) {Publisher publisher = new Publisher();publisher.publishMessage("Hello Redis!");}
}

简单的发送一个消息。

消息订阅方:
package org.yamikaze.redis.messsage.subscribe;import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;import java.util.concurrent.TimeUnit;/*** 消息订阅方客户端* @author yamikaze*/
public class SubscribeClient {private Jedis jedis;private static final String EXIT_COMMAND = "exit";public SubscribeClient() {jedis = MyJedisFactory.getLocalJedis();}public void subscribe(String ...channel) {if(channel == null || channel.length <= 0) {return;}//消息处理,接收到消息时如何处理JedisPubSub jps = new JedisPubSub() {/*** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现* 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage* 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]*/@Overridepublic void onMessage(String channel, String message) {if(Publisher.CHANNEL_KEY.equals(channel)) {System.out.println("接收到消息: channel : " + message);//接收到exit消息后退出if(EXIT_COMMAND.equals(message)) {System.exit(0);}}}/*** 订阅时*/@Overridepublic void onSubscribe(String channel, int subscribedChannels) {if(Publisher.CHANNEL_KEY.equals(channel)) {System.out.println("订阅了频道:" + channel);}}};//可以订阅多个频道 当前线程会阻塞在这儿jedis.subscribe(jps, channel);}public static void main(String[] args) {SubscribeClient client = new SubscribeClient();client.subscribe(Publisher.CHANNEL_KEY);//并没有 unsubscribe方法//相应的也没有punsubscribe方法}
}
    先运行client,再运行Publisher进行消息发送,输出结果:

总结:

    使用Redis的List数据结构可以简单迅速地做一个消息队列,同时Redis提供的BRPOP和BLPOP等指令解决了频繁调用Jedis的rpop和lpop方法造成的资源浪费问题。除此之外,Redis提供对发布/订阅模式的指令,可以实现消息传递、进程间通信。
参考资料
    《Redis入门指南》
 

Redis学习笔记之十:Redis用作消息队列相关推荐

  1. redis学习笔记(7)之redis哨兵详解

    redis哨兵详解 sentinel命令 客户端连接 素材代码 思路 实现过程 哨兵的切换实现原理 发布订阅基础 哨兵的实现原理 部署建议 需要关注的问题 代码流程 内容来源为六星教育,这里仅作为学习 ...

  2. redis学习笔记(2)之redis主从详解

    redis主从详解 主从详解 主从配置 拓扑 原理 数据同步 概念 复制偏移量 复制积压缓冲区 主节点运行ID Psync命令 全量复制流程 部分复制流程 心跳 缓冲大小调节 读写分离 内容来源为六星 ...

  3. redis学习笔记(6)之redis哨兵

    redis哨兵 redis哨兵初识 基础概念 主从复制的问题 redis 哨兵的高可用性 redis哨兵安装和部署 部署结构 内容来源为六星教育,这里仅作为学习笔记 redis哨兵初识 redis的主 ...

  4. redis学习笔记(5)之redis内存优化

    redis内存优化 配置优化 Linux 配置优化 Redis配置优化 缩减键值对象 命令处理 缓存淘汰优化 动态改配置命令 设置最大内存 设置淘汰策略 内存淘汰策略 如何选择淘汰策略 内容来源为六星 ...

  5. 【黑马程序员】Redis学习笔记001:Redis简介+五种基本数据类型

    一.Redis入门简介及基本操作命令 问题的抛出 出现的问题: 海量用户 高并发 罪魁祸首--关系型数据库: 性能瓶颈:磁盘IO性能低下 扩展瓶颈:数据关系复杂,扩展性差,不便于大规模集群 解决思路 ...

  6. 【Redis学习笔记】09.Redis 数据持久化

    Redis 数据持久化 1. 持久化概述 2. RDB 持久化 2.1. RDB 持久化优点 2.2. RDB 持久化缺点 2.3. RDB 持久化原理 2.4. RDB 触发方式 2.5. save ...

  7. 【Redis学习笔记】13.Redis 主从复制

    Redis 主从复制 1. Redis 主从复制特性 2. Redis 主从复制流程 3. Redis 主从复制操作 3.1. 快速部署Redis实例 3.2. 配置主从复制 3.3. 查看主从复制 ...

  8. 学习笔记 c++ (简单的消息队列)

    后面第二个版本是循环发布的 发送端cpp代码 #include <iostream> #include <sys/types.h> #include <sys/ipc.h ...

  9. Redis学习笔记(十八) 集群(下)

    复制和故障转移 Redis集群中的节点分为主节点(master)和从节点(slave),其中主节点用于处理槽,而从节点则用于复制某个主节点,并在被复制 的主节点下线时,代替下线主节点继续处理命令请求. ...

最新文章

  1. TF版本升级问题:成功解决AttributeError: module tensorflow has no attribute mul
  2. 科大星云诗社动态20210328
  3. 天平应什么放置_天平是否应该放干燥剂?
  4. JDBC登录功能实现
  5. 打破情感分类准确率 80 分天花板!更加充分的知识图谱结合范式
  6. rolling方式修改oplog
  7. wmware 安装xp系统虚拟机
  8. 基于Springboot实现高校社团管理系统
  9. 新人Unity下载安装
  10. 远程桌面管理助手有哪些?11款最好的远程桌面软件推荐。
  11. wps压缩word文档方法
  12. 解析云产品SLA的价值
  13. 一文读懂《理解未来的7个原则》
  14. javascript渐变色算法
  15. Mysql数据库操作语句总结(一)
  16. shiro salt
  17. python大数据需要什么技术有前途_大数据就业前景好不好 一般要掌握哪些技术...
  18. linux系统编程3—文件存储函数
  19. c++虚函数详解(你肯定懂了)
  20. 手机端省市区三级联动

热门文章

  1. 微信小程序python自动化测试_微信小程序UI自动化测试实践:Minium+PageObject
  2. 微信3.1.0.58逆向-微信3.1.0.58HOOK接口(WeChatHelper3.1.0.58.dll)使用说明-获取群成员
  3. 使用python爬取付费音乐
  4. OM6621PW蓝牙智能指纹锁(附芯片选型)
  5. 有货移动Web端性能优化探索实践
  6. 转摘-自阿里技术大牛的成长总结
  7. 前端性能优化方法(一)
  8. 新浪微博 Redis 实战经验分享
  9. 摩托罗拉g7 plus,手机浮躁时代的匠心臻品
  10. 神经网络模型的训练过程,人脸识别神经网络模型