当前位置: 首页 > 知识库问答 >
问题:

为什么当消息被消费者拿走时,Kafka主题队列不会为空?[重复]

白文彬
2023-03-14

我正在学习Kafka,如果有人能帮助我理解一件事。“制作人”向Kafka主题发送消息。它会在那里停留一段时间(默认为7天,对吗?)。

但是“消费者”收到这样的信息,永远保持它在那里没有多大意义。我预计当消费者收到它们时,这些信息会消失。否则,当我再次连接到Kafka时,我将再次下载相同的消息。所以我必须管理重复的避免。

它背后的逻辑是什么?

问候

共有1个答案

东门晨
2023-03-14

“制作人”向Kafka主题发送消息。它会在那里停留一段时间(默认为7天,对吗?)。

是的,生产者将数据发送到Kafka主题。每个主题都有自己的可配置cleanup.policy。默认情况下,它被设置为7天的保留期。您还可以根据字节大小配置主题的保留。

但“消费者”收到了这样的信息,并没有太大的意义将其永远留在那里。

Kafka可以被视为一个发布者/订阅者消息系统(尽管主要是一个流媒体平台)。它有一个很大的好处,即不止一个消费者可以阅读同一主题的相同消息。与其他消息传递系统相比,数据在消费者确认后不会被删除。

否则,当我再次连接到Kafka时,我将再次下载相同的消息。所以我必须管理重复的避免。

Kafka有“偏移量”和“消费者群体”的概念,我强烈建议熟悉它们,因为它们在使用Kafka时是必不可少的。每个使用者都是使用者组的一部分,主题中的每条消息都有一个称为“offset”的唯一标识符。偏移量就像一个唯一的标识符,在其生命周期内保持相同的消息。

每个消费者组都会跟踪它已经使用的消息(偏移量)。现在,如果您不想再次读取相同的消息,您的消费者组只需提交这些偏移量,它就不会再次读取它们。

这样,您就不会使用重复项,但其他使用者(具有不同的使用者组)仍能够再次读取所有消息。

 类似资料:
  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。

  • 我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk