大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是:
@Component
public class KafkaIntervalListener {
public CountDownLatch intervalLatch = new CountDownLatch(1);
private final SCDFRunnerService scdfRunnerService;
public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
this.scdfRunnerService = scdfRunnerService;
}
@KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
System.out.println("Recieved interval message: " + event);
IntervalType type = event.getType();
Instant instant = event.getInterval();
List<IntervalEvent> events = new ArrayList<>();
events.add(event);
events.size();
this.intervalLatch.countDown();
}
}
我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。
更新:我已经找到了一种方法来做它与factory.setBatchListener(真);但我需要找到启动它与@调度(cron = "${ kafka.cron},区=欧洲/莫斯科).现在这个方法总是在听。现在我尝试这样的东西:
@Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
kafkaIntervalListener.intervalLatch.await();
}
它不工作,在调试模式下,我的断点永远不会在这个网站上工作。
侦听器容器在设计上是消息驱动的。
对于按需获取消息,最好直接使用KafkaConsumer
API,并使用poll()
方法获取消息。
所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递
我尝试在Kafka中创建新主题时启动动态消费者,但动态启动的消费者总是缺少起始/第一条消息,但从那里开始消费消息。我正在使用kafka-python模块,并且正在使用更新的KafkaConsumer和KafkaProducer。 请建议一些解决这个问题的方法,或者我必须包含在我的生产者和消费者实例中的任何配置。
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;
我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。
有什么不同吗?术语KafkaConsumer和KafkaListener可以互换使用吗?
有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者