当前位置: 首页 > 知识库问答 >
问题:

自上次提交Kafka中的偏移量后重新启动处理

封景曜
2023-03-14

我有:

  • 连接的Kafka消费者

此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区

void aggregateProcessing(ConsumerRecords<String, SomeClass> consumer, RebalanceListener listener)
public class RebalanceListener implements ConsumerRebalanceListener {
    private Set<TopicPartition> assignedPartitions = new LinkedHashSet<>();

    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
       // keep track of assigned partitions
    }   
}

此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。

由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动处理。

是否有办法将每个分区的消费者内部偏移量重置回分配分区列表的最新提交偏移量?

我了解对所有分区进行再处理(而不仅仅是对已更改的分区)的效率低于选择性地删除某些处理,但可能比跟踪删除分区时需要删除的数据要容易得多。

谢谢

共有2个答案

步胜
2023-03-14

>

  • 每当从消费者重新调用分区时,您必须将分区及其提交的偏移量保存在DB中。

    将分区重新分配给使用者时,必须从存储在数据存储中的特定偏移量中进行搜索。

    当这些撤销和分配事件发生时,您的重新平衡侦听器将侦听。重新平衡侦听器的示例实现

    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
               commitDBTransaction();
        }
        public void onPartitionsAssigned(Collection<TopicPartition>  partitions){
                for(TopicPartition partition: partitions) {
                    consumer.seek(partition, getOffsetFromDB(partition));
                }
        }
    }
    

  • 凌华奥
    2023-03-14

    正如Kafka留档所描述的

    补偿和消费者地位

    Kafka为分区中的每条记录维护一个数字偏移量。该偏移量充当该分区内记录的唯一标识符,还表示使用者在分区中的位置。例如,位于位置5的使用者已使用偏移量为0到4的记录,并将接下来接收偏移量为5的记录。实际上,与消费者的用户相关的位置有两个概念:消费者的位置给出了将给出的下一条记录的偏移量。它将比使用者在该分区中看到的最高偏移量大一个。每次消费者在call to poll(持续时间)中收到消息时,它都会自动前进。

    提交的位置是安全存储的最后一个偏移量。如果进程失败并重新启动,这是使用者将恢复到的偏移量。使用者可以定期自动提交偏移量;或者,它可以选择通过调用一个提交API(例如commitSync和commitSync)手动控制此提交位置。

    因此,当您需要从最新提交的偏移量开始,并且您已经禁用了启用。汽车提交,然后您可以手动提交已处理邮件的偏移量。

    可以选择通过调用一个提交API(例如commitSync和commitSync)手动控制此提交位置。

    然后在重新启动和重新平衡Kafka之后,消费者将从最后提交(处理)的偏移量开始消费。

    上面的场景是当您使用Kafka存储进行消费补偿时。如果已经有了要开始使用的偏移量,则可以通过<代码>消费者来控制消费者的开始偏移量。开始使用之前,先搜索()。

    控制消费者的立场

    在大多数用例中,消费者只需从头到尾消费记录,定期提交其位置(自动或手动)。然而,Kafka允许使用者手动控制其位置,在分区中随意向前或向后移动。这意味着消费者可以重新使用较旧的记录,或跳到最新的记录,而无需实际使用中间记录。在一些情况下,手动html" target="_blank">控制消费者的位置可能很有用。

    一种情况是对时间敏感的记录处理,对于远远落后于处理所有记录的消费者来说,这可能是有意义的,因为他们不会试图赶上处理所有记录的速度,而只是跳到最近的记录。

    另一个用例是用于维护本地状态的系统,如上一节所述。在这样的系统中,消费者希望在启动时将其位置初始化为本地存储中包含的任何内容。同样,如果本地状态被销毁(例如因为磁盘丢失),则可以通过重新使用所有数据并重新创建状态(假设Kafka保留了足够的历史记录)在新机器上重新创建状态。

    Kafka允许使用seek(TopicPartition,long)指定新位置。还提供了查找服务器维护的最早和最新偏移量的特殊方法(分别为seektobegining(Collection)和seekToEnd(Collection))。

     类似资料:
    • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

    • 我尝试将auto.offset.reset设置为最早和最晚,但这不会更改行为。 我在消费者配置中遗漏了什么吗?

    • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

    • 但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?

    • 我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?

    • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350