当前位置: 首页 > 知识库问答 >
问题:

Redis streams:组/消费者命名和清理

江棋
2023-03-14

我有一个相当原始的流用例:多个生产者和一个消费者周期性地批量处理消息。有时会有多个使用者,这就是为什么我想从“传统”的不可靠队列(rpush/lrange/ltrim)切换到流。

实际上有两个相关的问题:

>

必须先通过xgroup create创建消费者组,然后才能从中创建xreadgroup。每次一个新的消费者开始(使用随机名称)时,它都会被添加到消费者列表中,但即使在流程结束后,它仍会保留在那里。我的理解是,在这种情况下,我需要将所有挂起的消息从死亡消费者重新分配到活动消费者,然后删除旧消费者。这是正确的吗?

共有1个答案

韩晋
2023-03-14
    null
 类似资料:
  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我需要想办法向Kafka要一份题目清单。我知道可以使用目录中包含的脚本来实现。一旦我有了这个列表,我需要每个主题的所有消费者。我在该目录中找不到脚本,在库中也找不到允许我这样做的类。 这背后的原因是,我需要弄清楚话题的偏移和消费者的偏移之间的区别。 有没有办法做到这一点?还是需要在每个消费者中实现此功能?

  • 本文向大家介绍消费者和消费者组有什么关系?相关面试题,主要包含被问及消费者和消费者组有什么关系?时的应答技巧和注意事项,需要的朋友参考一下 每个消费者从属于消费组。具体关系如下:

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka