当前位置: 首页 > 知识库问答 >
问题:

在某个特定时间之前阅读完所有消息后,停止KafkaListener(Spring Kafka Consumer)

鲜于光赫
2023-03-14

我试图从单个分区主题安排我的消费过程。我可以使用endpointlistenerregistry启动它。start()但我想在消耗完当前分区中的所有消息后,即当我到达当前分区中的最后一个偏移量时停止它。制作成主题是在我完成消费并关闭它之后完成的。我应该如何确保在启动scheduler并停止我的消费者之前,我已经阅读了所有消息?我正在为消费者使用Kafkalistener。

共有2个答案

慕冠宇
2023-03-14

>

  • 要读取到最后一个偏移量,您只需轮询直到获得空记录。

    您可以调用KafkanConsumer。消费结束时暂停()。在下一个计划中,需要调用KafkanConsumer。恢复()。

    暂停从请求的分区提取。在使用resume(Collection)恢复这些分区之前,将来对轮询(Duration)的调用不会返回这些分区中的任何记录。请注意,此方法不影响分区订阅。特别是,使用自动分配时,不会导致组重新平衡。

    像这样的东西,

    List<TopicPartition> topicPartitions = new ArrayList<>();
    void scheduleProcess() {
        topicPartitions = ... // assign partition info for this
        kafkaConsumer.resume(topicPartitions)
        while(true) {
           ConsumerRecords<String, Object> events = kafkaConsumer.poll(Duration.ofMillis(1000));
           if(!events.isEmpty()) {
              // processing logic
           } else {
              kafkaConsumer.pause(List.of(topicPartition));
              break;
           }
        }
    }
    
    

  • 陆翰学
    2023-03-14

    设置idleEventInterval容器属性,并添加一个EventListener方法来侦听ListenerContainerIdleEvent。

    然后停止容器。

     类似资料:
    • 更新06/04这里是消费者出厂设置。它是Spring-Kafka-1.3.1。Kafka经纪人合流版 注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。 运行大约1小时后(时间不同),使用者停止使用来自其主题的消息,即使该主题有许多可用消息。Consumer方法中有一个log语句,用于停止在日志中打印。 我们如何设计Spring-Kafka消费者,使其在停止消费的

    • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;

    • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

    • 我注意到我的Kafka Streams应用程序在一段时间没有读取来自Kafka主题的新消息时停止工作。这是我第三次看到这种情况发生。 自5天以来,没有向主题发送任何消息。我的Kafka Streams应用程序也托管了一个spark java Web服务器,它仍然具有响应能力。然而,Kafka Streams不再阅读我向Kafka主题发出的消息。当我重新启动应用程序时,所有消息都将从代理获取。 如何

    • 我的示例接受google pub/sub,记录它并将ack发送回 配置: NotificationHandler: 线程转储: 行: 但ThreadExecutor未执行deliverMessageTask。 在我看来,它看起来像是库中的一个bug,但可能是库误用。无论如何,我正在寻找任何解决方案/变通办法来避免这种情况。 我使用: spring-boot springCloudVersion=