当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka听者法中寻找一个特定的偏移量?

顾曾笑
2023-03-14
public class Listen implements ConsumerSeekAware 
{
 @Override
    public void registerSeekCallback(ConsumerSeekCallback callback)

    {
//      fetching offset from a database 
        Integer offset = offsetService.getOffset();
        callback.seek("topic-name",0,offset);

    }
 @KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment) throws Exception 
  {
//    processing the record 

      acknowledgment.acknowledge();    //manually commiting the record
//    committing the offset to MySQL database
  }
}
@KafkaListener(topics = "topic-name", groupId = "group")
  public void listen(ConsumerRecord record Acknowledgment acknowledgment, 
  @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer)) throws Exception {
       // seeking old offset stored in database (which is 11 )
        consumer.seek(partition,offsetService.getOffset());
        log.info("record offset is {} and value is {}" , record.offset(),record.value() );
        acknowledgment.acknowledge();
}

共有1个答案

池恩
2023-03-14

这个答案有几种技巧。

请记住,在使用者上执行seek直到下一轮轮询才会生效(在上一轮轮询中获取的任何记录都将首先发送给使用者)。

编辑

@SpringBootApplication
public class So63429201Application {

    public static void main(String[] args) {
        SpringApplication.run(So63429201Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, Listener listener) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so63429201", i % 3,  null, "foo" + i));
            Thread.sleep(8000);
            listener.seekToTime(System.currentTimeMillis() - 11000);
            Thread.sleep(8000);
            listener.seekToOffset(new TopicPartition("so63429201", 0), 11);
            Thread.sleep(8000);
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63429201").partitions(3).replicas(1).build();
    }

}

@Component
class Listener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "so63429201", topics = "so63429201", concurrency = "2")
    public void listen(String in) {
        System.out.println(in);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        System.out.println(assignments);
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToBeginning(assignments.keySet());
    }

    public void seekToTime(long time) {
        getSeekCallbacks().forEach((tp, callback) -> callback.seekToTimestamp(tp.topic(), tp.partition(), time));
    }

    public void seekToOffset(TopicPartition tp, long offset) {
        getSeekCallbackFor(tp).seek(tp.topic(), tp.partition(), offset);
    }

}
 类似资料:
  • 我试图阅读上一篇文章中的Kafka主题,但我无法正确阅读: 在属性文件中,我设置了以下内容: 组id=我的团队 客户。id=my_client 启用。汽车提交=真 自动。抵消重置=最早的 隔离。级别=读取已提交 然后代码: 我可以看到偏移量是如何从0打印出来的,即使我在主题中有一些项目。 在日志文件中,我可以看到以下内容(缩写): [Consumer clientId=my_client,grou

  • 例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht