我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置
摘自Kafka文档:
每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作:
配置enable.auto.commit=false
下面是我的示例代码:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
这是正确的做法吗?有没有更好的办法?
如果您提交了您的偏移,Kafka将为您存储它们(默认情况下最多24小时)。
这样,如果您的消费者死了,您可以在另一台机器上启动相同的代码,然后从您停止的地方继续。不需要外部存储。
请参阅https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/Consumer/kafkaconsumer.html中的“偏移量和消费者位置”
并建议您考虑使用commitSync
但是如果我们重新启动kafka服务器,使用者会重新读取已经提交的偏移量吗?或者这个选项在这样的情况下工作--服务器重新启动后,只会消耗未读的消息?
我对Kafka0.11.0.0有意见 在Kafka0.10.2.1中我对此没有任何问题。我只在0.11.0.0版本中遇到这个问题。 我的使用者将auto.offset.reset设置为最早,而auto commit设置为false,因为我是手动提交的。Kafka数据存储在具有必要权限的非TMP目录中。broker配置的其余部分为默认配置。 我需要0.11.0.0版本的事务。我不知道问题出在哪里。这
我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350
我遇到的问题是,当Kafka和Flink作业重新启动时,Flink Kafka消费者偏移量会重置为0,因此即使我启用了检查点并且我在Flink作业中启用了精确一次语义学,数据也会被重新处理。 这是我的环境详细信息 < li >在Kubernetes下奔跑 < li>Kafka源主题有10个分区,没有复制。 < li >Kafka有三个经纪人。 < li>Flink checkpointing启用了
我有: 连接的Kafka消费者 此外,我有一个方法,它接受两个参数:消费者和一个重新平衡侦听器,该侦听器跟踪分配给消费者的分区 此方法在计时器上运行,其目标是处理记录,直到没有剩余的记录可读取,或者直到所有分区中的某个最长时间。 由于重新平衡可能发生在使用过程中(在consumer.poll()已触发多次之后),因此我希望检测此情况,重置并从所有分配的分区(即使已分配)的最后提交偏移量开始重新启动
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1