当前位置: 首页 > 知识库问答 >
问题:

使用Kafka主题的所有消息并断开连接

漆雕誉
2023-03-14

我有一个批处理作业,它将一天触发一次。要求是

  1. 使用该时间点上关于Kafka主题的所有可用消息
  2. 处理消息
  3. 如果进程已成功完成,则提交偏移量。

当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafka使用者。

当消息处理完成并成功完成时,我创建一个新的KafkaConsumer并提交应用程序维护的偏移量。

注意,我关闭了最初用于读取消息的KafkaConsumer,并使用另一个KafkaConsumer实例提交偏移,以避免消费者重新平衡异常

我期待最多5K消息的主题。主题被分区和复制。

有没有更好的方法在特定的时间点消耗所有关于主题的消息?有没有我遗漏的或者需要照顾的东西?我认为我不需要处理消费者的再平衡,因为我为循环中的消息poll()并在轮询完成后处理消息。

我使用的是JavaKafka客户端V0.9,如果在上面的场景中有帮助的话,可以改成V0.10。

谢谢

更新:

AtomicBoolean flag = new AtomicBoolean();
flag.set(true);

while(flag.get()) {

 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout);

 if(consumerRecords.isEmpty()) {
     flag.set(false);
     continue;
 }
  //if the ConsumerRecords is not empty process the messages and continue to poll()
}
kafkaConsumer.close();

共有1个答案

颜瀚漠
2023-03-14

您不能假设在调用poll()之后,由于使用者上的max.poll.records配置参数,您已经读取了主题中的所有可用消息。这是单个poll()返回的最大记录数,其默认值为500。这意味着,如果在这个时刻主题中有600条消息,您需要对poll()进行两次调用以读取所有消息(但同时考虑到可能会有其他消息到达)。我不明白的另一件事是为什么要使用不同的消费者来提交偏移。你说的消费者再平衡例外是什么?

 类似资料:
  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我需要能够从一开始就消费一个主题的所有消息。基本上与这个StackOverflow查询相同,但是针对Kafka 0.9进行了更新。(0.9特定的StackOverflow答案似乎相对较少)。 Kafka高级消费者使用Java API从主题获取所有消息(相当于从头开始) 0.9有一个完全不同的API,我真的不知道从哪里开始。我可以使用提供的bash脚本从命令行执行此操作,但不知道如何前进。 您能否为

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 如何通过忽略主题中所有现有的消息来只使用来自Kafka主题的最新消息。我有两个相同主题的使用者,当我开始使用来自该主题的消息时,它会获取最早的消息。我需要在我的使用者启动后使用消息。我在消费者配置中尝试了此配置,但这不起作用。

  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??