我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点?
谢谢
在回答您的问题时:
我的理解是,当你执行消费者.poll()时
,会返回一个字典。所以,当我想轮询信息时,我用一个循环来遍历字典。
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
for value in messages[msg]:
#Add just the values to the list
data.append(value[6])
我相信你正在做的是得到迭代器与消费者=Kafka消费者('mysubject',bootstrap_servers=[服务器],group_id=group_id,enable_auto_commit=True)
,然后步行迭代器与
#start iterate
for message in consumer:
print(message)
看起来你实际上并没有得到500个投票结果。您可以通过将max_poll_records=5
添加到KafkaConsumer配置中来确认这一点。然后,当您运行代码时,如果打印出5条以上的消息,您可以知道您没有使用轮询功能。
希望有所帮助!
您将不会< code>seekToEnd()到日志的结尾。
请记住,您首先需要订阅主题,然后才能进行搜索。此外,订阅是懒惰的。因此,在搜索之前,您还需要添加一个“虚拟民意调查”。
consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()
// now enter your regular poll-loop
谢谢,
它起作用了!
这是我的代码的简化版本:
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
print(message)
consumer.close()
文档指出poll()方法与迭代器接口不兼容,我猜想迭代器接口就是我在脚本结尾的循环中使用的接口。然而,从最初的测试来看,这段代码看起来工作正常。
使用安全吗?还是我理解错了文档?
谢谢
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是
我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:
我在kafka中面临一个奇怪的问题,即在消费者应用程序重新启动后,所有来自主题的kafka消息都在重播。有人能帮我我在这里做错了什么吗? 这是我的配置: spring.kafka.consumer.auto-偏移-重置=最早 spring.kafka.enable.auto。提交=false 我的生产者配置: 消费者配置: 消费者代码: 集装箱代码 消费者配置 应用程序.属性
我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1