当前位置: 首页 > 知识库问答 >
问题:

KafkaConsumer Position()vs Committed()

呼延钱明
2023-03-14
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).

committed(TopicPartition partition): OffsetAndMetadata  

Get the last committed offset for the given partition (whether the commit happened by this process or another).

如果我需要使用特定消费组的最新提交偏移量(用于从Spark结构化流开始偏移),我应该使用什么。

我的代码显示已弃用。

  val latestOffset = consumer.position(partition)
  val last=consumer.committed(partition)

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>

官方文件:

偏移量和使用者位置Kafka为分区中的每个记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并将接下来接收偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位置给出了将给出的下一条记录的偏移量。它将比使用者在该分区中看到的最高偏移量大一个。每次消费者在call to poll(长)中收到消息时,它都会自动前进。

提交的位置是安全存储的最后一个偏移量。如果进程失败并重新启动,这是使用者将恢复到的偏移量。使用者可以定期自动提交偏移量;或者,它可以选择通过调用一个提交API(例如commitSync和commitSync)手动控制此提交位置。

共有1个答案

白宏大
2023-03-14

您需要将Spark流作业中提交的偏移量用作startingOffset。

KafkaConsumer在其运行时递增地增加了位置API的计数器,并且可以与提交的API的结果略有不同,因为使用者可以提交偏移量,也可以不提交偏移量,如果提交偏移量,则可以异步执行。

在Kafka 2.4.1中,方法提交(分区)已弃用,建议使用更新的API,它采用Topic分区的Set。它的签名是:

public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)

在使用Scala时,需要将Scala集转换为Java集。这可以按此处所述完成。

 类似资料:

相关问答

相关文章

相关阅读