position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).
committed(TopicPartition partition): OffsetAndMetadata
Get the last committed offset for the given partition (whether the commit happened by this process or another).
如果我需要使用特定消费组的最新提交偏移量(用于从Spark结构化流开始偏移),我应该使用什么。
我的代码显示已弃用。
val latestOffset = consumer.position(partition)
val last=consumer.committed(partition)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
官方文件:
偏移量和使用者位置Kafka为分区中的每个记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并将接下来接收偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位置给出了将给出的下一条记录的偏移量。它将比使用者在该分区中看到的最高偏移量大一个。每次消费者在call to poll(长)中收到消息时,它都会自动前进。
提交的位置是安全存储的最后一个偏移量。如果进程失败并重新启动,这是使用者将恢复到的偏移量。使用者可以定期自动提交偏移量;或者,它可以选择通过调用一个提交API(例如commitSync和commitSync)手动控制此提交位置。
您需要将Spark流作业中提交的偏移量用作startingOffset。
KafkaConsumer在其运行时递增地增加了位置API的计数器,并且可以与提交的API的结果略有不同,因为使用者可以提交偏移量,也可以不提交偏移量,如果提交偏移量,则可以异步执行。
在Kafka 2.4.1中,方法提交(分区)
已弃用,建议使用更新的API,它采用Topic分区的Set
。它的签名是:
public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)
在使用Scala时,需要将Scala集转换为Java集。这可以按此处所述完成。