当前位置: 首页 > 知识库问答 >
问题:

关于再平衡的Kafka流重新处理旧消息

曾德水
2023-03-14

我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。

5 Kafka brokers
Kafka topics - 15 partitions and replication factor 3. 

每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。

注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。

这会不会是因为偏移.保留.分钟。

log retention policy : 3 days
offset.retention.minutes : 1 day

在这方面的任何帮助都将不胜感激。

共有1个答案

弓磊
2023-03-14

抵销保留会产生影响。Cf此常见问题:https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing-data-from-the-beginary

还cf如何用Kafka流手动提交?以及如何用Kafka流手动提交?关于如何提交工作。

 类似资料:
  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我知道在你的流中的任何时间点都可能发生再平衡。当它发生时,由于没有提交给定偏移量的最新偏移量,可能会发生事件的重新处理。 Kafka流是否允许在重新平衡发生之前完成任何飞行中处理?我的意思是,你的应用程序正在消耗一个记录(在你的过程方法内部),发生一个再平衡事件。该处理是否立即中止或允许处理方法完成? 一个具体的例子是 最后一次计算是否会在状态存储中结束并转发到接收器主题?因此,这意味着当重新平衡

  • 我有4个单一分区和应用程序的三个实例的主题。我试图通过编写一个自定义的PartitionGrouper来实现可伸缩性,它将创建如下3个任务: 第一个实例-topic1,分区0,topic4,分区0 第二个实例-主题2,分区0 第三实例-桌面3,分区0 我将NUM_STANDBY_REPLICAS_CONFIG配置为1,因为它将在本地维护状态(也可以消除invalidstatestore异常)。 上

  • 有人能告诉我Kafka消费者的再平衡算法是什么吗?我想了解分区计数和消费者线程是如何影响这一点的。 非常感谢。

  • 我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何

  • 我们使用StatefulSet在Kubernetes上部署Scala Kafka Streams应用程序。实例具有单独的s,因此它们每个都复制完整的输入主题,以实现容错。它们本质上是只读服务,只读取一个状态主题并将其写入状态存储,客户请求通过REST从那里得到服务。这意味着,在任何给定时间,消费者组总是仅由单个Kafka Streams实例组成。 现在我们的问题是,当触发滚动重启时,每个实例大约需