Streams是Redis 5.0引入的新数据类型,它参考了Kafka消费组的概念,允许一组客户机协作使用同一消息流的不同部分。在此之前,redis实现消息队列主要有三种:list、pubsub、sets,它们分别有不同缺陷,list阻塞太久会导致线程回收,pubsub无法持久化,消息大量堆积会导致服务强制断开,sets消息唯一性不好保证,不能做重复消息,且他们都不能分组,不能保证断网等情况下消息被确保消费、也不能重复消费。
streams引入生产者、消费者组、消费者等概念,本组消费者从生产者那里读取到的最新消息标记为last_delivered_id(游标),任意一个消费者读取了新消息都会让游标向前,组内消费者读取但还未确认的消息id会计入pending_ids,消费者确认后会从pending_ids删除,这样做可以保证客户端至少消费消息一次,防止因为断网导致的消息丢失等情况。
每个消费组都会收到生产者的所有消息,但是在一个消费组内部,一条消息分给一个消费者后就不再分给另一个消费者,除非一段时间内消费失败后主动转移(xclaim)。不分组时所有消费者都会收到生产者的所有消息。
xadd 生产者名 * 消息key1 消息value1 消息key2 消息value2 ...(可以加很多组,但通常只需要一组)
*表示自动生成唯一key,是unix毫秒时间戳_同一毫秒的序列号。也可以自定义自增id。
生产者不存在时xadd自动创建
xtrim 生产者名 (可选参数~) n
将当前消息队列长度修剪为n,早期消息会被丢弃
当加上可选参数~ 表示近似修剪,可以提高性能
xdel 生产者名 消息id
在消息一段时间内无法消费成功,并且通常更换了好几个消费者后仍然消费不成功,就要考虑删除本条坏死的消息了,删除成功返回1,失败为0
xlen 生产者名
xrange 组名 起始id 结束id( 设为- +表示从最小到最大) count n(查询n条数据)
与消费者组read一次消息后游标就向前不同,生产者自行查看不影响游标
xpevrange:倒序查看消息
xpevrange 组名 起始id 结束id( 设为+ - 表示从最大到最小) count n
正序只能- +,逆序只能+ - ,否则读不到数据
xgroup create 生产者名 消费组名 起始消费id(0-0 或0表示从头开始消费,$表示从尾即最新消息开始消费)
xread (count n 可选,读取n条数据) (block 0 可选,阻塞等待毫秒数) streams 生产者1 消息id 生产者2 消息id 生产者3 消息id ...
读取指定消息id之后的消息(不包含指定id的)
xread是纯读取操作,不涉及修改游标等,因此可以在从服务器进行
xread可以同时读取多条消息,只需要设置一次count 、block,多条时count表示从每个生产者取出count条数据
block设置为0时表示一直阻塞,不拿到数据不罢休.不设置block就是非阻塞模式,拿不拿到都返回,拿不到返回nil
xreadgroup group 消费组名 消费者 (count n 可选参数,读取n条数据) (block 0 可选参数,阻塞等待毫秒数) streams 生产者名 消息id
消息id是读取指定id后的消息,设为>是读取最新消息的意思
读取过的不再重复读,也可以设置具体消息id为读取大于指定id的消息(不包含指定id的)
如果设置了阻塞读并且当前没有数据,产生了阻塞等待,会返回阻塞时间。
xreadgroup是写命令,只能在主服务器进行(它修改了游标等信息)。
xack 生产者名 消费组名 消息id
已经读取但是未确认的消息会写入pending_ids,确认后会从pending_ids删除,这样做是为了确保消息至少被消费一次。
XGROUP SETID:生产者名 消费组名 新id(为0表示从头开始)
当某条消息传递中确认丢失,可以重新设置最后传递id,这样消费组中的消费者读取下一个时会重新读取到这条消息
xclaim 生产者名 消费组名 转移到的消费者名 等待时长(为0时不等待) 消息id
xgroup delconsumer 生产者名 消费组名 要删的消费者
xgroup destroy 生产者名 消费组名
xpending 生产者名 消费组名
xinfo streams 生产者名 查看生产者信息
xinfo groups 生产者名 查看生产者对应的消费者组信息
xinfo consumers 生产者名 消费组名
查看某组内消费者情况,返回待确认消息数量等信息
xinfo pubsub xgroup之类有子命令的命令可以跟help查看子命令
当然比不上专业的kafka、rockerMQ这些本身支持多master模式、多master多slave异步复制模式、多master多slave同步双写模式,但是结合 redis sentinel、cluster,也可以保证其高可用,需要注意的是xreadgroup这类操作实际上是写操作,因此只能在主服务器进行。
因为网络传输等故障,消费者消费成功却没将确认消息返回给服务器,导致消息队列不知道此消息已被消费把它转给别的消费者,就造成了重复消费。
但是stream的消息被发送给某个消费者后就默认不再发送给其他消费者,所以重复消费问题基本可以保证,当一段时间内收不到ack需要执行转移时,要先确认消息执行失败或未执行成功。
或者在每次执行消息前查询所执行消息的唯一id是否已被成功消费,并在消费后将所执行的消息唯一id写入第三方介质比如redis做记录。
生产者、消息队列丢数据主要来自消息不能持久化,stream是可以持久化的,但是在故障转移的过程可能造成部分xadd数据的丢失,WAIT命令可用于强制将更改传播到一组副本,虽然这使得数据丢失的可能性非常小,但是由Sentinel或Redis Cluster操作的Redis故障转移过程仅执行尽力而为检查,以将故障转移到更新最多的副本,并且在某些特定的故障情况下,可能会升级缺少一些数据的副本为主服务器。因此在故障转移中应仔细评估和配置。
消费者丢失数据主要来自网络环境的波动,因为stream采用ack确认机制,并不会在读取后就删除消息,所以可以保证消息至少被消费一次,是可靠的。
stream自动生成的消息id是唯一的,是unix毫秒时间戳_序号(64位),即使在服务器时间错乱的情况下,序号也可以保障其消息唯一、相对有顺序。
redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream(比如指定hash标签)。Kafka也是通过客户端的 hash 算法来将不同的消息塞入不同分区的。
扩展: