我使用的是0.10.1.1 API的高级使用者。
奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。
我在代码中只有一个提交点。
else if(message.getClass() == ProcessedBatches.class) {
try {
Logger.getRootLogger().info("[" + this.name + "/Reader] Commiting ...");
ProcessedBatches msg = (ProcessedBatches) message;
consumer.commitSync(msg.getCommitInfo());
lastCommitData = msg.getCommitInfo();
lastCommit = System.currentTimeMillis();
} catch (CommitFailedException e) {
Logger.getRootLogger().info("[" + this.name + "/Reader] Failed to commit... Last commit: " + lastCommit + " | Last batch: " + lastBatch + ". Current uncommited messages: " + uncommitedMessages);
self().tell(HarakiriMessage.getInstance(), self());
}
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
String output =
"############## SHUTTING DOWN CONSUMER ############### \n" +
lastCommitData+"\n";
System.out.println(output);
}));
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for (TopicPartition p:collection
) {
System.out.println("Starting position "+p.toString()+":" + consumer.position(p));
}
coordinator.setRebalanceTimestamp(System.currentTimeMillis());
}
});
一个分区的示例:
关机前偏移量:3107169023
分区分配时的偏移量:3107180350
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", group_id);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "100000000");
props.put("session.timeout.ms", "10000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("max.poll.records", "40000");
props.put("auto.offset.reset", "latest");
我认为你的假设“关机前偏移量:3107169023”是基于你的关机挂钩打印的,这是正确的吗?
如果是这样的话,我看到了两个潜在的问题。
注册shutdown挂钩时,您将关闭lastCommitData字段。
当虚拟机开始其关机序列时,它将以某种未指定的顺序启动所有已注册的关机挂钩,并让它们并发运行
因此不能保证在shutdown钩子已经打印了lastCommitData值之后,使用者不会继续提交偏移量。
我建议你检查一下Kafka,看看你的应用程序关闭后,实际提交的偏移量是多少。
我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?
我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这
我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动
我尝试将auto.offset.reset设置为最早和最晚,但这不会更改行为。 我在消费者配置中遗漏了什么吗?
我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理: 该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是: 首先,它显示还原使用者客户端的创建。自动偏移复位无: > 配置了两个消费者的原因是什么? 为什么第二个函数具有,而我没有显式配置它,而且Kafka的默认值是最新的? 我已
但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?