当前位置: 首页 > 知识库问答 >
问题:

如何从Apache Nifi中最后提交的偏移量读取消费者中的Kafka消息?

元英朗
2023-03-14

我已经开始让我的制作人向Kafka发送数据,也让我的消费者提取相同的数据。当我在ApacheNIFI中使用ConsumerKafka处理器(kafka版本1.0)时,我脑海中很少有与kafka consumer相关的查询。

Q.1)当我第一次启动ConsumeKafka处理器时,我如何从开始和当前消息中读取消息?

问题2)以及在Kafka消费者关闭的情况下,如何在最后一条消费信息之后阅读信息?

在使用Apache Nifi时,我们如何实现上述两个?

共有1个答案

咸浩初
2023-03-14

ConsumerKafka处理器有一个名为“Offset Reset”的属性,该属性在消费者组id没有以前的偏移量或偏移量不再存在时使用。此属性的选项为“最新偏移”或“最早偏移”,默认为“最新偏移”。

因此,如果您使用从未使用过的消费者组标识启动ConsumeKafka处理器,那么它将从最新消息开始消费。之后,如果您启动和停止处理器,它将从上次消耗的偏移量开始。

如果要再次使用“偏移重置”强制将其设置为最早或最晚,则需要更改消费者组id,否则现有消费者组将始终使用现有偏移开始。

您不能同时从开始和当前读取消息,您可以从开始开始并一直读取到当前,或者从当前开始。这是Kafka的工作方式,并不是NiFi特有的。

 类似资料:
  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 如有任何帮助,我们将不胜感激。

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2