我试图从单个分区主题安排我的消费过程。我可以使用endpointlistenerregistry启动它。start()但我想在消耗完当前分区中的所有消息后,即当我到达当前分区中的最后一个偏移量时停止它。制作成主题是在我完成消费并关闭它之后完成的。我应该如何确保在启动scheduler并停止我的消费者之前,我已经阅读了所有消息?我正在为消费者使用Kafkalistener。
>
要读取到最后一个偏移量,您只需轮询直到获得空记录。
您可以调用KafkanConsumer。消费结束时暂停()。在下一个计划中,需要调用KafkanConsumer。恢复()。
暂停从请求的分区提取。在使用resume(Collection)恢复这些分区之前,将来对轮询(Duration)的调用不会返回这些分区中的任何记录。请注意,此方法不影响分区订阅。特别是,使用自动分配时,不会导致组重新平衡。
像这样的东西,
List<TopicPartition> topicPartitions = new ArrayList<>();
void scheduleProcess() {
topicPartitions = ... // assign partition info for this
kafkaConsumer.resume(topicPartitions)
while(true) {
ConsumerRecords<String, Object> events = kafkaConsumer.poll(Duration.ofMillis(1000));
if(!events.isEmpty()) {
// processing logic
} else {
kafkaConsumer.pause(List.of(topicPartition));
break;
}
}
}
设置idleEventInterval容器属性,并添加一个EventListener方法来侦听ListenerContainerIdleEvent。
然后停止容器。
更新06/04这里是消费者出厂设置。它是Spring-Kafka-1.3.1。Kafka经纪人合流版 注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。 运行大约1小时后(时间不同),使用者停止使用来自其主题的消息,即使该主题有许多可用消息。Consumer方法中有一个log语句,用于停止在日志中打印。 我们如何设计Spring-Kafka消费者,使其在停止消费的
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
我注意到我的Kafka Streams应用程序在一段时间没有读取来自Kafka主题的新消息时停止工作。这是我第三次看到这种情况发生。 自5天以来,没有向主题发送任何消息。我的Kafka Streams应用程序也托管了一个spark java Web服务器,它仍然具有响应能力。然而,Kafka Streams不再阅读我向Kafka主题发出的消息。当我重新启动应用程序时,所有消息都将从代理获取。 如何
我的示例接受google pub/sub,记录它并将ack发送回 配置: NotificationHandler: 线程转储: 行: 但ThreadExecutor未执行deliverMessageTask。 在我看来,它看起来像是库中的一个bug,但可能是库误用。无论如何,我正在寻找任何解决方案/变通办法来避免这种情况。 我使用: spring-boot springCloudVersion=