当前位置: 首页 > 知识库问答 >
问题:

应用程序重新启动时Kafka上次偏移量增加

端木高卓
2023-03-14

我使用的是0.10.1.1 API的高级使用者。

奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。

我在代码中只有一个提交点。

else if(message.getClass() == ProcessedBatches.class) {
        try {
            Logger.getRootLogger().info("[" + this.name + "/Reader] Commiting ...");
            ProcessedBatches msg = (ProcessedBatches) message;
            consumer.commitSync(msg.getCommitInfo());
            lastCommitData = msg.getCommitInfo();
            lastCommit = System.currentTimeMillis();
        } catch (CommitFailedException e) {
            Logger.getRootLogger().info("[" + this.name + "/Reader] Failed to commit... Last commit: " + lastCommit + " | Last batch: " + lastBatch + ". Current uncommited messages: " + uncommitedMessages);
            self().tell(HarakiriMessage.getInstance(), self());
        }
    }
  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        String output = 
                "############## SHUTTING DOWN CONSUMER ############### \n" + 
                lastCommitData+"\n";
        System.out.println(output);
    }));
new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            for (TopicPartition p:collection
                 ) {
                System.out.println("Starting position "+p.toString()+":" + consumer.position(p));
            }
            coordinator.setRebalanceTimestamp(System.currentTimeMillis());
        }
    });

一个分区的示例:

关机前偏移量:3107169023

分区分配时的偏移量:3107180350

    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("group.id", group_id);
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "100000000");
    props.put("session.timeout.ms", "10000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("max.poll.records", "40000");
    props.put("auto.offset.reset", "latest");

共有1个答案

糜博远
2023-03-14

我认为你的假设“关机前偏移量:3107169023”是基于你的关机挂钩打印的,这是正确的吗?

如果是这样的话,我看到了两个潜在的问题。

注册shutdown挂钩时,您将关闭lastCommitData字段。

当虚拟机开始其关机序列时,它将以某种未指定的顺序启动所有已注册的关机挂钩,并让它们并发运行

因此不能保证在shutdown钩子已经打印了lastCommitData值之后,使用者不会继续提交偏移量。

我建议你检查一下Kafka,看看你的应用程序关闭后,实际提交的偏移量是多少。

 类似资料:
  • 我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?

  • 我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这

  • 我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动

  • 我尝试将auto.offset.reset设置为最早和最晚,但这不会更改行为。 我在消费者配置中遗漏了什么吗?

  • 我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理: 该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是: 首先,它显示还原使用者客户端的创建。自动偏移复位无: > 配置了两个消费者的原因是什么? 为什么第二个函数具有,而我没有显式配置它,而且Kafka的默认值是最新的? 我已

  • 但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?