当前位置: 首页 > 知识库问答 >
问题:

使用Kafka中所有消息的更快方法-主题

宗政安歌
2023-03-14
c = Consumer({
    'bootstrap.servers': 'brokers:9092',
    'group.id': 'consume_all_topics',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

now = datetime.now()
msg = c.poll(5.0)
while msg.value()['timestamp'] < now:
    msg = c.poll(5.0)

共有1个答案

伯建安
2023-03-14

“即使我们用相同的group-id设置了多个用户,我们仍然没有太大的性能改进。

关于如何以更快的方式获得Kafka主题的所有信息,有什么提示吗?“

Kafka的消耗随着主题中分区的数量而变化。请记住,一个分区只能由一个使用者组中的一个使用者使用。如果分区数量与使用者组中的使用者数量匹配,您将获得最佳的使用者性能。

 类似资料:
  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 我需要能够从一开始就消费一个主题的所有消息。基本上与这个StackOverflow查询相同,但是针对Kafka 0.9进行了更新。(0.9特定的StackOverflow答案似乎相对较少)。 Kafka高级消费者使用Java API从主题获取所有消息(相当于从头开始) 0.9有一个完全不同的API,我真的不知道从哪里开始。我可以使用提供的bash脚本从命令行执行此操作,但不知道如何前进。 您能否为

  • 如何通过忽略主题中所有现有的消息来只使用来自Kafka主题的最新消息。我有两个相同主题的使用者,当我开始使用来自该主题的消息时,它会获取最早的消息。我需要在我的使用者启动后使用消息。我在消费者配置中尝试了此配置,但这不起作用。

  • 我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?