当前位置: 首页 > 知识库问答 >
问题:

Kafka Consumer未获取所有消息

呼延庆
2023-03-14

我尝试在Kafka中创建新主题时启动动态消费者,但动态启动的消费者总是缺少起始/第一条消息,但从那里开始消费消息。我正在使用kafka-python模块,并且正在使用更新的KafkaConsumer和KafkaProducer。

producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')

请建议一些解决这个问题的方法,或者我必须包含在我的生产者和消费者实例中的任何配置。

共有1个答案

金骞尧
2023-03-14

是否可以将auto_offset_reset设置为最早。

当创建新的使用者流时,它从最新的偏移量(这是auto_offset_reset的默认值)开始,您将错过在使用者未启动时发送的消息。

你可以在kafka python Doc中读到它。相关部分如下

auto_offset_reset(str)-一个重置OffsetOutOfRange错误的偏移量的策略:“最早”将移动到最早的可用消息,“最新”将移动到最近的消息。任何其他值都将引发异常。默认值:“最新”。

 类似资料:
  • 大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是: 我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。 更新:我已经找到了一种方法来做它与factory.setBatchList

  • 我有一个FutureBuilder(很高兴加载)从Firebase获取我的DocumentSnapshot列表,但问题是如果我推送/弹出屏幕,它会一遍又一遍地重建它。为了解决这个问题,我最终将它放在了上,但我有一个问题,我有时无法从中获取所有文档。 我有一个按钮可以启用国家/地区筛选和禁用,每次按下它几乎都会立即显示我的文档,例如,我总共筛选了11个国家/地区,筛选了5个国家/地区,但有时我不会全

  • 我正在对一个不和谐的机器人进行编码,我想在其中执行命令并且它将从id为的特定通道(该通道可能是发送的不同通道,也可能不是)拉出一个随机的of消息。我一直在查看文档中的discord.js,但我找不到一种方法,可以通过它的ID获取一个TextChannel,然后使用TextChannels函数,从而获得MessageManager和一个消息集合。 我知道我可以使用获得公会(其中是的触发器)或获得特克

  • 我正在使用Azure服务总线队列。但是我不能使用“获取所有队列消息(peek Lock):微软内置于api”从队列中获取所有消息。 有没有办法获取所有队列消息? {"$连接":{"值":{"servicebus_1":{"连接ID":"/订阅/c776fex3-6aec-4722-b099-b054c267b240/资源组/Plugin-Resources/提供者/Microsoft.网络/连接/

  • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;