当前位置: 首页 > 工具软件 > Streams > 使用案例 >

【redis】streams简介

皇甫雨石
2023-12-01

Streams是Redis 5.0引入的新数据类型,它参考了Kafka消费组的概念,允许一组客户机协作使用同一消息流的不同部分。在此之前,redis实现消息队列主要有三种:list、pubsub、sets,它们分别有不同缺陷,list阻塞太久会导致线程回收,pubsub无法持久化,消息大量堆积会导致服务强制断开,sets消息唯一性不好保证,不能做重复消息,且他们都不能分组,不能保证断网等情况下消息被确保消费、也不能重复消费。

streams引入生产者、消费者组、消费者等概念,本组消费者从生产者那里读取到的最新消息标记为last_delivered_id(游标),任意一个消费者读取了新消息都会让游标向前,组内消费者读取但还未确认的消息id会计入pending_ids,消费者确认后会从pending_ids删除,这样做可以保证客户端至少消费消息一次,防止因为断网导致的消息丢失等情况。

每个消费组都会收到生产者的所有消息,但是在一个消费组内部,一条消息分给一个消费者后就不再分给另一个消费者,除非一段时间内消费失败后主动转移(xclaim)。不分组时所有消费者都会收到生产者的所有消息。


生产者

xadd :添加消息

xadd 生产者名 * 消息key1 消息value1 消息key2 消息value2 ...(可以加很多组,但通常只需要一组)

*表示自动生成唯一key,是unix毫秒时间戳_同一毫秒的序列号。也可以自定义自增id。

生产者不存在时xadd自动创建

xtrim :修剪消息

xtrim 生产者名  (可选参数~) n

将当前消息队列长度修剪为n,早期消息会被丢弃

当加上可选参数~ 表示近似修剪,可以提高性能

xdel:删除消息

xdel  生产者名 消息id

在消息一段时间内无法消费成功,并且通常更换了好几个消费者后仍然消费不成功,就要考虑删除本条坏死的消息了,删除成功返回1,失败为0

xlen:查询消息长度

xlen 生产者名

xrange:查看消息

xrange 组名 起始id  结束id(  设为- +表示从最小到最大)   count n(查询n条数据)

与消费者组read一次消息后游标就向前不同,生产者自行查看不影响游标

xpevrange:倒序查看消息

xpevrange 组名  起始id  结束id(  设为+ - 表示从最大到最小)  count n

正序只能- +,逆序只能+ - ,否则读不到数据


消费者

xgroup create:创建消费者组

xgroup create 生产者名 消费组名 起始消费id(0-0 或0表示从头开始消费,$表示从尾即最新消息开始消费)

xread:读取消息

xread (count n 可选,读取n条数据)  (block 0  可选,阻塞等待毫秒数) streams  生产者1 消息id  生产者2 消息id 生产者3 消息id ...

读取指定消息id之后的消息(不包含指定id的)

xread是纯读取操作,不涉及修改游标等,因此可以在从服务器进行

xread可以同时读取多条消息,只需要设置一次count 、block,多条时count表示从每个生产者取出count条数据

block设置为0时表示一直阻塞,不拿到数据不罢休.不设置block就是非阻塞模式,拿不拿到都返回,拿不到返回nil

分组读取消息:xreadgroup

xreadgroup group 消费组名 消费者  (count n 可选参数,读取n条数据)  (block 0  可选参数,阻塞等待毫秒数) streams 生产者名 消息id     

消息id是读取指定id后的消息,设为>是读取最新消息的意思

读取过的不再重复读,也可以设置具体消息id为读取大于指定id的消息(不包含指定id的)

如果设置了阻塞读并且当前没有数据,产生了阻塞等待,会返回阻塞时间。

xreadgroup是写命令,只能在主服务器进行(它修改了游标等信息)。

确认消息:xack

xack 生产者名 消费组名 消息id

已经读取但是未确认的消息会写入pending_ids,确认后会从pending_ids删除,这样做是为了确保消息至少被消费一次。

为消费者组重置最后递送消息ID:XGROUP

XGROUP SETID:生产者名 消费组名 新id(为0表示从头开始)

当某条消息传递中确认丢失,可以重新设置最后传递id,这样消费组中的消费者读取下一个时会重新读取到这条消息

消息转移:xclaim

xclaim  生产者名 消费组名 转移到的消费者名  等待时长(为0时不等待)  消息id

xgroup delconsumer:删除消费者

xgroup delconsumer 生产者名 消费组名 要删的消费者

xgroup destroy:删除消费者组

xgroup destroy  生产者名 消费组名

xpending:查看消费组待处理消息的相关信息

xpending 生产者名 消费组名


查询

查询:xinfo

xinfo streams 生产者名 查看生产者信息

xinfo groups 生产者名 查看生产者对应的消费者组信息

xinfo consumers 生产者名 消费组名

查看某组内消费者情况,返回待确认消息数量等信息

xinfo pubsub xgroup之类有子命令的命令可以跟help查看子命令


消息队列相关问题

高可用性

当然比不上专业的kafka、rockerMQ这些本身支持多master模式、多master多slave异步复制模式、多master多slave同步双写模式,但是结合 redis sentinel、cluster,也可以保证其高可用,需要注意的是xreadgroup这类操作实际上是写操作,因此只能在主服务器进行。

重复消费问题

因为网络传输等故障,消费者消费成功却没将确认消息返回给服务器,导致消息队列不知道此消息已被消费把它转给别的消费者,就造成了重复消费。

但是stream的消息被发送给某个消费者后就默认不再发送给其他消费者,所以重复消费问题基本可以保证,当一段时间内收不到ack需要执行转移时,要先确认消息执行失败或未执行成功。

或者在每次执行消息前查询所执行消息的唯一id是否已被成功消费,并在消费后将所执行的消息唯一id写入第三方介质比如redis做记录。

消费的可靠性

生产者、消息队列丢数据主要来自消息不能持久化,stream是可以持久化的,但是在故障转移的过程可能造成部分xadd数据的丢失,WAIT命令可用于强制将更改传播到一组副本,虽然这使得数据丢失的可能性非常小,但是由Sentinel或Redis Cluster操作的Redis故障转移过程仅执行尽力而为检查,以将故障转移到更新最多的副本,并且在某些特定的故障情况下,可能会升级缺少一些数据的副本为主服务器。因此在故障转移中应仔细评估和配置。

消费者丢失数据主要来自网络环境的波动,因为stream采用ack确认机制,并不会在读取后就删除消息,所以可以保证消息至少被消费一次,是可靠的。

消息的顺序性

stream自动生成的消息id是唯一的,是unix毫秒时间戳_序号(64位),即使在服务器时间错乱的情况下,序号也可以保障其消息唯一、相对有顺序。

分区

redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream(比如指定hash标签)。Kafka也是通过客户端的 hash 算法来将不同的消息塞入不同分区的。

扩展:

https://developer.aliyun.com/article/603193

 类似资料: