当前位置: 首页 > 知识库问答 >
问题:

Kafka从相同偏移量重新启动

孙乐逸
2023-03-14

我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置

摘自Kafka文档:

每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作:

配置enable.auto.commit=false

下面是我的示例代码:

constructor{    
    load data into offsetMap<partition,offset>
    initFlag=true;
}

Main method
{
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(initFlag) // is this correct way to override offset position?
    {
        seekToPositions(offsetMap); 
        initFlag=false;
    }
    while(!shutdown)
    {
        for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                getOffsetPositions();// dump offsets and partitions to db/disk
        }   
   }
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
    //code to put partition and offset into map
    //write to disk or db

    }
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
            //code get partitions and offset from offsetMap
            consumer.seek(partition, offset);

    }

这是正确的做法吗?有没有更好的办法?

共有1个答案

简学文
2023-03-14

如果您提交了您的偏移,Kafka将为您存储它们(默认情况下最多24小时)。

这样,如果您的消费者死了,您可以在另一台机器上启动相同的代码,然后从您停止的地方继续。不需要外部存储。

请参阅https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/Consumer/kafkaconsumer.html中的“偏移量和消费者位置”

并建议您考虑使用commitSync

 类似资料:
  • 但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?

  • 我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这

  • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350

  • 我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了

  • 我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动

  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1