场景

Redis使用list可以实现队列功能,但无法做到广播模式,使用PubSub可以做到广播模式,但是PubSub并不能保证数据的持久化。

Stream就是为了满足上面的需求在Redis 5.0中发布的,它的内部结构是一个链表,将消息都串起来,每个消息拥有一个自己的ID和内容,该结构是持久化保存的。

基本概念

每个Stream都以key作为自己的名字,相当于消息队列中的topic,生产者和消费者对这个key进行发布订阅即可,Stream在第一次使用 xadd 指令时被创建。

消费组

每个Stream可以有多个消费组(Consumer Group),每个消费组会有个游标(last_delivered_id)在Stream链表上往前移动,表示该消费组消费到了哪条消息。

消费组需要通过 xgroup create 进行创建,需要指定从Stream某个消息ID开始消费,该ID就是游标(last_delivered_id)的初始值。

每个消费者之间是独立的,A消费了数据1,不会影响到B消费数据1,即一份数据可以被多个消费组消费。

消费者

一个消费组可以有多个消费者(Consumer),这下消费者之间是竞争关系,任意一个消费者消费数据后,都会导致游标(last_delivered_id)向前移动,每个消费者都有一个组内唯一名称。

消费者内部有一个状态变量: pending_ids,该变量记录已经被客户端读取,但是没有ACK的消息,客户端没有ACK的消息越多,这里保存的消息id就越多,ACK后将相应减少,

官方称 pending_ids 为 PEL(Pending Entires List),用来确保客户端至少消费了一次,防止数据丢失。

消息ID与内容

消息ID格式为:timestamplnMillis-sequence,前者是消息产生的毫秒时间戳,后者是在该毫秒产生的第几条消息,例如 1527846880572-5 ,代表是 1527846880572 毫秒时的第 5 条消息,

消息ID可以由服务器自动生成,也可以是客户端自己指定,但是形式必须是 整数-整数,后来的消息ID必须大于前面的消息ID。

消息内容是一个类似hash结构的键值对。

基本命令

  1. xadd:向Stream发一条消息
# stream-1 表示stream的名字,* 则是系统该自动生成id,后面的则是我们的数据键值对127.0.0.1:6379> xadd stream-1 * name zhangsan age 10"1641454184593-0"127.0.0.1:6379> xadd stream-1 * name lisi age 10"1641454195771-0"127.0.0.1:6379> xadd stream-1 * name wangwu age 10"1641454208016-0"复制代码
  1. xdel:从Stream中删除一条消息
127.0.0.1:6379> xdel stream-1 1641454195771-0(integer) 0
复制代码
  1. xrange:获取Stream中的消息列表
127.0.0.1:6379> xrange stream-1 - +1) 1) "1641454639427-0"2) 1) "name"2) "zhangsan"3) "age"4) "10"2) 1) "1641454646977-0"2) 1) "name"2) "lisi"3) "age"4) "10"3) 1) "1641454651750-0"2) 1) "name"2) "wangwu"3) "age"4) "10"
复制代码
  1. xlen:获取Stream消息长度
127.0.0.1:6379> xlen stream-1(integer) 3
复制代码
  1. del:删除整个Stream消息列表
127.0.0.1:6379> del stream-1(integer) 1127.0.0.1:6379> xrange stream-1 - +(empty array)
复制代码

消费模式

消费模式有两种,独立消费与消费组,前者就是一个消息只能被全局消费一次,后者则是多个消费组可以对同一个消费者消费一次。

独立消费

Stream 提供了 一个 xread 指令,该指令在消费消息的时候会忽略消费组的存在。

  1. 使用 xread 从头部读取两条消息

127.0.0.1:6379> xread count 2 streams stream-1 0-01) 1) "stream-1"2) 1) 1) "1641455116178-0"2) 1) "name"2) "zhangsan"3) "age"4) "10"2) 1) "1641455118418-0"2) 1) "name"2) "zhangsan1"3) "age"4) "10"
复制代码
  1. 从尾部接收最新消息,之前的消息全部会被忽略

这种接收最新消息一般配合阻塞使用,如果不使用阻塞大概率读的是空

127.0.0.1:6379> xread count 1 streams stream-1 $(nil)
复制代码

阻塞读取,当有xadd往该stream-1添加消息时,将立刻返回,block以s为单位,阻塞s表后没有数据将返回,block 0 就是永远阻塞,直到有消息来临。

127.0.0.1:6379> xread block 0 count 1 streams stream-1 $1) 1) "stream-1"2) 1) 1) "1641455579648-0"2) 1) "name"2) "wangliu"3) "age"4) "20"(38.07s)
复制代码

消费组

通过 xgroup create 可以创建一个消费组,并提供一个消息起始id初始化 last_delivered_id 变量。

  1. 为 stream-1 添加一个名为 cgroup-1 的消费组,从第一条消息开始消费。
xgroup create stream-1 cgroup-1 0-0
复制代码
  1. 为 stream-1 添加一个名为 cgroup-2 的消费组,该消费组只监听新消息
xgroup create stream-1 cgroup-2 $
复制代码
  1. 查看 stream-1 的详细信息
127.0.0.1:6379> xinfo stream stream-11) "length"2) (integer) 4 # 共4个消息3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1641455579648-0"9) "groups"10) (integer) 2 # 两个消费组11) "first-entry" # 第一个消息12) 1) "1641455116178-0"2) 1) "name"2) "zhangsan"3) "age"4) "10"13) "last-entry" # 最后一个消息14) 1) "1641455579648-0"2) 1) "name"2) "wangliu"3) "age"4) "20"
复制代码

消费

xreadgroup 用于消费组进行消费消息,需要提供消费组名称、消费者名称、起始消息ID,与 xread 一样,也可以进行阻塞等待新消息,

读到新消息时,该消息ID将存入消费者的PEL中,处理完毕后客户端使用 xack 指令通知服务器,该消息ID将被从PEL中移除。

  1. 消费消息

使用消费组 cgroup-1 中的 c1 消费者读取 stream-1 中的 一条消息,> 表示从当前消费组的 last_delivered_id 后面开始读,没读一条消息,该值就会往前进一个。

127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 count 1 streams stream-1 >1) 1) "stream-1"2) 1) 1) "1641455579648-0"2) 1) "name"2) "wangliu"3) "age"4) "20"127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 count 1 streams stream-1 >(nil) 读完了复制代码
  1. 阻塞等待读取
xreadgroup GROUP cgroup-1 c1 block 0 count 1 streams stream-1 >
复制代码

用另一个客户端发送消息

127.0.0.1:6379> xadd stream-1 * name mn age 30"1641457442683-0"复制代码

读到新消息返回

127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 block 0 count 1 streams stream-1 >1) 1) "stream-1"2) 1) 1) "1641457442683-0"2) 1) "name"2) "mn"3) "age"4) "30"(45.20s)
复制代码
  1. 观察消费组信息
127.0.0.1:6379> xinfo groups stream-11) 1) "name"2) "cgroup-1"3) "consumers"4) (integer) 1 # 一个消费者5) "pending"6) (integer) 5 # 5 条消息还没有ack7) "last-delivered-id"8) "1641457442683-0"2) 1) "name"2) "cgroup-2"3) "consumers"4) (integer) 05) "pending"6) (integer) 07) "last-delivered-id"8) "1641455579648-0"
复制代码
  1. 观察消费者信息

# cgroup-1下的消费者127.0.0.1:6379> xinfo consumers stream-1 cgroup-1 1) 1) "name"2) "c1" # c1 消费者3) "pending"4) (integer) 5 # 5条消息还没有ack5) "idle"6) (integer) 504275 # 空闲了多久 ms 没有读取消息复制代码
  1. ack消息
# ack一条消息
xack stream-1 cgroup-1 1641455116178-0
复制代码
# ack多条消息127.0.0.1:6379> xack stream-1 cgroup-1 1641455118418-0 1641455120461-0 1641455579648-0 1641457442683-0(integer) 4复制代码

5条消息全部ack后,待ack的数量变为0了。

127.0.0.1:6379> xinfo consumers stream-1 cgroup-1 1) 1) "name"2) "c1"3) "pending"4) (integer) 05) "idle"6) (integer) 684848复制代码

其他场景

消息定长

如果需要将Stream中的消息保持一定的长度,可以在xadd时,增加一个maxlen参数,该参数可以把老的消息剔除,保证消息长度不超过maxlen。

先增加5条数据

127.0.0.1:6379> xadd stream-1 * name n1"1641458449368-0"127.0.0.1:6379> xadd stream-1 * name n2"1641458451353-0"127.0.0.1:6379> xadd stream-1 * name n3"1641458452737-0"127.0.0.1:6379> xadd stream-1 * name n4"1641458454740-0"127.0.0.1:6379> xadd stream-1 * name n5"1641458456357-0"复制代码

使用定长参数,设置长度为3,可以看到,当使用定长参数后,链表中的消息只有3个了。

127.0.0.1:6379> xadd stream-1 maxlen 3 * name n5"1641458535493-0"127.0.0.1:6379> xinfo stream stream-11) "length"2) (integer) 33) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1641458535493-0"9) "groups"10) (integer) 211) "first-entry"12) 1) "1641458454740-0"2) 1) "name"2) "n4"13) "last-entry"14) 1) "1641458535493-0"2) 1) "name"2) "n5"复制代码

忘记ACK

消费者的PEL保存了没有ACK的消息ID,如果没有ACK的消息越来越多,那么这个结构就会占用空间越来越大,所以千万不要忘记ACK。

PEL防止消息丢失

PEL主要的作用是,如果客户端消费了一条消息,此时发生网络断开等原因,这个消息还没被客户端消费就丢失了,因此PEL可以保证,在没有ACK前,一直在PEL中保存该消息ID,等客户端重新连接上后,可以再次消费PEL中的消息ID,一般建议客户端将参数设置成 0-0,表示从laster_delivered_id后开始消费。

Stream高可用

Stream与其他数据结构并没有区别,它的高可用也是通过建立在主从复制上,在 Cluster 和 Sentinel 即可支持高可用,由于Redis指令复制是异步的,因此可能会发生丢失极小部分的数据丢失。

分区Partition

Stream没有提供分区的能力,如果需要实现的话,可以建立多个Stream结构,然后客户端通过对key进行hash的方式,路由到不同的Stream分区中。

觉得有帮助请帮忙点下,谢谢:Redis之Stream-阿里云开发者社区

Redis之Stream相关推荐

  1. redis stream持久化_Beetlex.Redis之Stream功能详解

    原标题:Beetlex.Redis之Stream功能详解 有一段时间没有写文章,techempower的测试规则评分竟然发生了变化,只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化 ...

  2. redis 流 stream的使用总结 - 消费者组

    本博客讲述如何使用redis中流stream的组 简言 1. 消费者组(consumer group)允许用户将一个流从逻辑上分成多个不同的流,并让消费者组组下的消费者去处理组中的消息 2. 多个消费 ...

  3. redis 流 stream的使用总结 - 如何遍历

    本博客讲述如何对redis中的流进行遍历 接上篇博客redis 流 stream的使用总结 - 基础命令 简言 1. XRANGE,XREVRANGE,XREAD命令只适合单个消费者模式,因为这三个命 ...

  4. redis 流 stream的使用总结 - 基础命令

    简言 1.   流(stream)是redist5.0版本新增加的数据结构,也是该版本最重要的更新,专门用于实现消息队列,事件系统 2.   redis之前的其他的数据结构实现消息队列,各有缺点 2. ...

  5. Beetlex.Redis之Stream功能详解

    有一段时间没有写文章,techempower的测试规则评分竟然发生了变化,只能忘着补充一下占比权重最多的数据更新示例了和深入设计一下组件模块化加载的设计.但在不久前有用户问了一下组件是否支持redis ...

  6. Redis进阶-Stream多播的可持久化的消息队列

    文章目录 Pre Stream简介 Stream特性 消息 ID 消息内容 命令预览 独立消费 创建消费组 消费 Stream 消息积压怎么处理 消息如果忘记 ACK 会怎样? PEL 如何避免消息丢 ...

  7. 你还不了解Redis的发布/订阅功能与Redis的Stream吗

    一.Redis 中的发布/订阅功能 发布/ 订阅系统 是 Web 系统中比较常用的一个功能.简单点说就是 发布者发布消息,订阅者接受消息,这有点类似于我们的报纸/ 杂志社之类的: (借用前边的一张图) ...

  8. redis灵魂拷问:如何使用stream实现消息队列

    redis在很早之前就支持消息队列了,使用的是PUB/SUB功能来实现的.PUB/SUB有一个缺点就是消息不能持久化,如果redis发生宕机,或者客户端发生网络断开,历史消息就丢失了. redis5. ...

  9. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

最新文章

  1. sql sever avg保留小数_《数据库系统概念》笔记 (一)SQL
  2. [Server] 服务器配置SSH登录邮件通知
  3. Java 反射机制[Field反射]
  4. 使用文件监控对象FileSystemWatcher实现数据同步
  5. 给程序员的10条建议
  6. C#语法之event关键字
  7. HDU-1429 胜利大逃亡(续)
  8. cnvd与cnnvd区别_漏洞都是怎么编号的CVE/CAN/BUGTRAQ/CNCVE/CNVD/CNNVD
  9. PYTHON 顺丰快递分拣小程序-极简9行代码实现分拣
  10. kettle使用httpClient获取ES索引数据
  11. HDU 1402(FFT,NNT)
  12. 5.1.3 NoSQL数据库-Redis(键值key-value)-五大数据类型
  13. 10月24日——程序猿的节日
  14. long类型保留两位小数_Java保留两位小数的几种写法总结
  15. Olly Advanced 1.1 by MaRKuS TH-DJM
  16. sulley测试环境搭建的相关总结
  17. 转载一篇RC电路分类
  18. 从反反鸡汤谈过犹不及
  19. CAD文件翻译和本地化
  20. 01 KVM虚拟化简介

热门文章

  1. C语言定义起泡法函数对n个数,C语言程序设计第10讲.ppt
  2. arduino 时间灯控
  3. mybatis-plus入门,熟练掌握 MyBatis-Plus,一篇就够!
  4. 【python】用winsound模块播放音乐
  5. 【牛客网】BC64 K形图案
  6. CSS实现虚线的方法
  7. 从暴富到爆仓:合约中的百态人生 |链捕手
  8. Mysql查询用户最后一次登陆时间
  9. 离开国企继续前(chu)进(chou )——北漂18年(34)
  10. 用css做出好看的盒子阴影