当前位置: 首页 > 知识库问答 >
问题:

redis在消费者群体内进行有序处理

殳智志
2023-03-14

我将python(aioredis)与redis streams一起使用。

我有一个一个生产者-多个(分组)消费者的场景,并希望确保消费者以有序的方式处理发送到流的(批量)消息,这意味着:当第一条消息完成时,处理流中的下一条消息,依此类推。这也意味着消费者组中的消费者在某一时间正在处理,而其他消费者将等待。

我还希望依赖于第二、第三等消费群体中的有序处理——所有这些都依赖于发送到一个流的相同消息。意思是:

message 1 ... n -> stream1 
ordered processing within group 1 ... n  
whereas consumer 1 ... n per group 1 ... n

当我还想确保每个组的潜在订单检查逻辑不会有太多过载时,什么是完成这项工作的好方法?

共有1个答案

丘普松
2023-03-14

让我回到同步处理的老派,如果您想按顺序处理流消息,这并不容易,原因是失败/重试。

考虑到您希望最多处理一次每个消息,流消息执行是一个关键部分,消费者组成员作为线程/进程。

若要同步此操作,您需要某种锁定机制,因为消费者组可能在不同的机器上运行。您可以使用全局锁定机制来防止多个使用者使用来自同一流的消息。

您可以使用Redis锁(RedLock)来获取/释放锁。

伪代码

Procedure SequentialProcessor

Input: StreamName
Input: ConsumerName
Input: ConsumerGroup
Input: LockTime 


BEGIN
    redLock = RedLock()
    WHILE True DO
     IF redLock.aquireLock(StreamName#ConsumerGroup, LockTime) THEN
       message = redis.XREADGROUP( ConsumerGroup, StreamName, ...)
       TRY
         processMessage( message )
       FINALLY
          redLock.releaseLock( StreamName#ConsumerGroup )
     ENDIF
    END
END

 
 类似资料:
  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向

  • null 编辑:好的,所以我取得了一些进步(如果我错了请纠正我): 每个消费者都将获得所有消息。 租约被分配了一个EventProcessorHost,所以它需要一个唯一的名称,所以这里的使用者组名称实际上并不相关。 仍然不能百分之百确定context.checkpointasync,但我相信它只适用于ConsumerGroup?