当前位置: 首页 > 知识库问答 >
问题:

通过忽略所有现有消息,开始只使用来自Kafka主题的最新消息

杜俊楚
2023-03-14

如何通过忽略主题中所有现有的消息来只使用来自Kafka主题的最新消息。我有两个相同主题的使用者,当我开始使用来自该主题的消息时,它会获取最早的消息。我需要在我的使用者启动后使用消息。我在消费者配置中尝试了此配置,但这不起作用。

config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

共有1个答案

干照
2023-03-14

你试过了吗

auto.offset.reset

https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

 类似资料:
  • 我需要能够从一开始就消费一个主题的所有消息。基本上与这个StackOverflow查询相同,但是针对Kafka 0.9进行了更新。(0.9特定的StackOverflow答案似乎相对较少)。 Kafka高级消费者使用Java API从主题获取所有消息(相当于从头开始) 0.9有一个完全不同的API,我真的不知道从哪里开始。我可以使用提供的bash脚本从命令行执行此操作,但不知道如何前进。 您能否为

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较