当前位置: 首页 > 知识库问答 >
问题:

如何在属于Abstract消费者SeekAware类的Spring Kafka中实现lookToEnd()?

哈和惬
2023-03-14

我有一个要求。我们有一个Spring启动Kafka消费者应用程序,它正在阅读Kafka主题。我们的要求是,每当应用程序出现故障并出现时,我都希望开始最新的偏移量,而不是被旧值所困扰。是否有可能重置组的偏移量?我已经研究了一点点,并使用抽象消费者使用 seekToEnd() 将偏移量设置为末尾,如下面的代码所示。

public class KafkaConsumer extends AbstractConsumerSeekAware {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;



    @KafkaListener(topics = "${topic.consumer}")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        //consuming the message from consumer Record.
        seekToEnd();
        doSomething();
        ack.acknowledge();
    }

但是,当我们停止应用程序并重新启动它时,它从它离开的最后一个偏移量开始读取,但我们希望仅在应用程序启动时从偏移量读取。我们怎样才能做到这一点?

共有1个答案

燕琛
2023-03-14

@KafkaListener方法中执行此操作是不正确的。只有当使用者从分区传递记录时,才会真正调用此分区。

出于这个原因,您必须实现onPartitionsAssigned(),因此消费者将在开始轮询之前查找这些分区。

在文档中查看更多内容:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

 类似资料:
  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 这是一个关于Kafka和信息如何被消费的非常基本的问题,但不幸的是,我在这一点上找不到任何答案。 假设我想过度分区,那么我将得到比消费者多10倍的分区。过度分区是必需的,因为我希望能够扩展(在未来并行处理更多的消息)。 1 个主题分为 1000 个分区,由 100 个使用者使用 =- 我的问题是: > 消息是如何为每个消费者消费的:它是以循环方式完成的吗?如果不是,分发是如何完成的? 有没有保证消

  • 如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。