当前位置: 首页 > 知识库问答 >
问题:

Kafka听众,获取所有消息

骆雅昶
2023-03-14

大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是:

@Component
public class KafkaIntervalListener {

    public CountDownLatch intervalLatch = new CountDownLatch(1);
    private final SCDFRunnerService scdfRunnerService;

    public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
        this.scdfRunnerService = scdfRunnerService;
    }

    @KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
    public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
        System.out.println("Recieved interval message: " + event);
        IntervalType type = event.getType();
        Instant instant = event.getInterval();
        List<IntervalEvent> events = new ArrayList<>();
        events.add(event);
        events.size();


        this.intervalLatch.countDown();
    }

}

我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。

更新:我已经找到了一种方法来做它与factory.setBatchListener(真);但我需要找到启动它与@调度(cron = "${ kafka.cron},区=欧洲/莫斯科).现在这个方法总是在听。现在我尝试这样的东西:

    @Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
    kafkaIntervalListener.intervalLatch.await();
}

它不工作,在调试模式下,我的断点永远不会在这个网站上工作。

共有1个答案

吕德惠
2023-03-14

侦听器容器在设计上是消息驱动的。

对于按需获取消息,最好直接使用KafkaConsumerAPI,并使用poll()方法获取消息

 类似资料:
  • 所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递

  • 我尝试在Kafka中创建新主题时启动动态消费者,但动态启动的消费者总是缺少起始/第一条消息,但从那里开始消费消息。我正在使用kafka-python模块,并且正在使用更新的KafkaConsumer和KafkaProducer。 请建议一些解决这个问题的方法,或者我必须包含在我的生产者和消费者实例中的任何配置。

  • 我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;

  • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。

  • 有什么不同吗?术语KafkaConsumer和KafkaListener可以互换使用吗?

  • 有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者