public class Listen implements ConsumerSeekAware
{
@Override
public void registerSeekCallback(ConsumerSeekCallback callback)
{
// fetching offset from a database
Integer offset = offsetService.getOffset();
callback.seek("topic-name",0,offset);
}
@KafkaListener(topics = "topic-name", groupId = "group")
public void listen(ConsumerRecord record Acknowledgment acknowledgment) throws Exception
{
// processing the record
acknowledgment.acknowledge(); //manually commiting the record
// committing the offset to MySQL database
}
}
@KafkaListener(topics = "topic-name", groupId = "group")
public void listen(ConsumerRecord record Acknowledgment acknowledgment,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer)) throws Exception {
// seeking old offset stored in database (which is 11 )
consumer.seek(partition,offsetService.getOffset());
log.info("record offset is {} and value is {}" , record.offset(),record.value() );
acknowledgment.acknowledge();
}
这个答案有几种技巧。
请记住,在使用者上执行seek直到下一轮轮询才会生效(在上一轮轮询中获取的任何记录都将首先发送给使用者)。
编辑
@SpringBootApplication
public class So63429201Application {
public static void main(String[] args) {
SpringApplication.run(So63429201Application.class, args).close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, Listener listener) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so63429201", i % 3, null, "foo" + i));
Thread.sleep(8000);
listener.seekToTime(System.currentTimeMillis() - 11000);
Thread.sleep(8000);
listener.seekToOffset(new TopicPartition("so63429201", 0), 11);
Thread.sleep(8000);
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63429201").partitions(3).replicas(1).build();
}
}
@Component
class Listener extends AbstractConsumerSeekAware {
@KafkaListener(id = "so63429201", topics = "so63429201", concurrency = "2")
public void listen(String in) {
System.out.println(in);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
System.out.println(assignments);
super.onPartitionsAssigned(assignments, callback);
callback.seekToBeginning(assignments.keySet());
}
public void seekToTime(long time) {
getSeekCallbacks().forEach((tp, callback) -> callback.seekToTimestamp(tp.topic(), tp.partition(), time));
}
public void seekToOffset(TopicPartition tp, long offset) {
getSeekCallbackFor(tp).seek(tp.topic(), tp.partition(), offset);
}
}
我试图阅读上一篇文章中的Kafka主题,但我无法正确阅读: 在属性文件中,我设置了以下内容: 组id=我的团队 客户。id=my_client 启用。汽车提交=真 自动。抵消重置=最早的 隔离。级别=读取已提交 然后代码: 我可以看到偏移量是如何从0打印出来的,即使我在主题中有一些项目。 在日志文件中,我可以看到以下内容(缩写): [Consumer clientId=my_client,grou
例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack
问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht