当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka 0.9 Java API从主题开始使用所有消息

龙枫
2023-03-14

我需要能够从一开始就消费一个主题的所有消息。基本上与这个StackOverflow查询相同,但是针对Kafka 0.9进行了更新。(0.9特定的StackOverflow答案似乎相对较少)。

Kafka高级消费者使用Java API从主题获取所有消息(相当于从头开始)

0.9有一个完全不同的API,我真的不知道从哪里开始。我可以使用提供的bash脚本从命令行执行此操作,但不知道如何前进。

您能否为我提供适当的方法或一个小的示例脚本来让我入门?谢谢!

共有1个答案

靳高明
2023-03-14

您需要将< code>auto.offset.reset设置为< code>earliest。见https://Kafka . Apache . org/documentation . html # new consumer configs

另请参阅 https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L179

 类似资料:
  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 如何通过忽略主题中所有现有的消息来只使用来自Kafka主题的最新消息。我有两个相同主题的使用者,当我开始使用来自该主题的消息时,它会获取最早的消息。我需要在我的使用者启动后使用消息。我在消费者配置中尝试了此配置,但这不起作用。

  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??

  • 我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。 如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。

  • 我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:

  • 如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K