当前位置: 首页 > 知识库问答 >
问题:

在使用者重新启动后,Kafka主题上的记录不会被消耗

韩寒
2023-03-14

我面对Kafka消费者非常奇怪的问题。我的设置在下面。

<prop key="enable.auto.commit">true</prop>
<prop key="auto.commit.interval.ms">10</prop>
<prop key="auto.offset.reset">latest</prop>

在我的小组中有2个消费者对单一主题进行投票。我没有在使用者端进行任何手动偏移管理。
现在,当我的使用者关闭时,我的主题上发布的新消息很少,通常当我重新启动我的使用者时,我会消耗这些消息。
但有一次我发现,在重新启动我的使用者后,我无法从主题中消耗那些新消息(当使用者关闭时发布的消息没有轮询)。当我再发布几条消息时,它开始从新的消息偏移量读取,并且我丢失了以前在我的用户停机时发布的消息。
请让我知道此行为背后的可能原因。

共有1个答案

钮善
2023-03-14

默认情况下,使用者在建立连接后获取发布到kafka主题的消息。尝试使用生产者和kafka控制台使用者,而不使用--From-Barting参数

 类似资料:
  • 我正在运行一个pyspark作业,数据流来自Kafka。我试图在我的windows系统中复制一个场景,以了解当数据持续输入Kafka时,消费者宕机时会发生什么。 这是我所期待的。 null 现在,当我的生产者生成消息1、2和3时,消费者就可以消费了。在读取第3条消息后,我杀死了正在运行的使用者作业(CLI.bat文件)。我的生产者产生消息4、5和6,以此类推....现在,我带回我的消费者作业(CL

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点? 谢谢