根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。
因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。
是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?
Kafaka是否区分已消耗的抵销和已提交的抵销?
是的,差别很大。消费的偏移量由使用者管理,使用者将从主题分区中获取后续消息。
使用者可以(但不是必须的)自动或通过调用commit API提交消息。该信息存储在名为__consumer_offsets的Kafka内部主题中,并存储基于ConsumerGroup、topic和Partition的提交偏移量。如果客户机正在重新启动或者新的使用者加入/离开ConsumerGroup,就会使用它。
请记住,如果您的客户机没有提交偏移量n
但后来提交了偏移量n+1
,对于Kafka来说,当您同时提交这两个偏移量时,情况不会有所不同。
编辑:关于消耗和提交的偏移量的更多详细信息可以在KafkaConsumer关于偏移量和消费者位置的JavaDocs中找到:
Kafka为分区中的每个记录维护一个数值偏移量。该偏移量用作该分区中记录的唯一标识符,还表示使用者在分区中的位置。例如,处于位置5的消费者已经消费了具有偏移0到4的记录,并且接下来将接收具有偏移5的记录。与消费者的用户相关的位置实际上有两个概念:
使用者的位置给出了下一个记录的偏移量,该记录将被输出。它将比使用者在该分区中看到的最高偏移量大一个。每次使用者在轮询(持续时间)调用中接收到消息时,它都会自动前进。
提交位置是已安全存储的最后一个偏移量。如果进程失败并重新启动,这就是使用者将恢复到的偏移量。使用者可以定期自动提交偏移;或者它可以选择通过调用其中一个提交API(例如commitSync和commitAsync)来手动控制这个提交的位置。
我有一个Kafka的题目有1个分区。如果它有100条消息,偏移量将从0.99开始。 根据Kafka保留策略,在指定的时间之后,所有的消息都将被清除。 并且我正在发送100个新的消息到主题,一旦所有已经被清除(在保留期之后)。现在,消息的新偏移量从哪里开始呢?是从100还是从0?? 我想知道新的偏移是100-199还是0-99?
但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
Spring-Boot版本--2.2.6发行版 Spring-Kafka-2.3.7发行版 Kafka-客户端-2.3.1 阿帕奇-Kafka-Kafka2.12-2.3.1 我们有10个主题和50个分区,每个主题属于同一组,我们增加主题分区和用户计数在运行时根据负载。 自动提交=false 处理后同步提交每个偏移量 max-poll-records设置为1
我有一个有几个消费者的消费群体。每个使用者被分配到一组分区。消费者何时轮询选择了已使用分区的消息?它是在消费者端完成的,还是Kafka服务器决定使用哪个分区? 我的一些分区有很多消息,但有些分区没有或几乎没有。但我仍然需要我的消费者平等地使用分配给它的每个分区。因此,我需要我的消费者快速遍历分区,最好从每个分配的分区轮询x条消息。 我在用https://github.com/appsignal/r
具有特定组id的使用者连接到代理,监听主题不到1分钟,然后断开连接(根据业务逻辑)。当它监听主题时,它可以使用一些消息。当同一个使用者重复这个动作时,它会使用相同的消息! 我发现Kafka用间隔1分钟保存偏移。这意味着消费者必须听超过1分钟的主题。我怎样才能缩短这个间隔? 我发现了这样的属性: null null
在执行手工抵销管理时,我遇到了以下问题:(使用0.9) 为了手动管理偏移量,对于每个消耗的记录,我检索记录的当前偏移量并提交新的偏移量(currentOffset+1,因为偏移量重置策略是“最新的”)。 当创建新的使用者组时,它没有显式的偏移量(偏移量是“未知”的),因此,如果它在停止之前没有使用来自所有现有分区的消息,那么它将只为部分分区(使用者从中获取消息的分区)提交偏移量,而其余分区的偏移量