有什么不同吗?术语KafkaConsumer和KafkaListener可以互换使用吗?
@kafkalistener
是ConcurrentMessageListenerContainer
的高级API,它在kafkaconsumer
周围产生了多个内部侦听器。
不同的是,当您需要时调用poll()
时,KafkaConsumer
API是可按需轮询的。侦听器抽象将在poll()
周围有一个无限循环,每当记录从poll()
出现时,它就为记录生成消息。我们有一个任务执行器,它运行如下逻辑:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
在该PollandInvoke();
中调用KafkaConsumer.poll()
。
所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递
有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是: 我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。 更新:我已经找到了一种方法来做它与factory.setBatchList
我做了一个poc,其中我使用spark流从Kafka读取数据。但我们的组织要么使用ApacheFlink,要么使用Kafka消费者从ApacheKafka读取数据,作为标准流程。所以我需要用Kafka消费者或ApacheFlink替换Kafka流媒体。在我的应用程序用例中,我需要从kafka读取数据,过滤json数据并将字段放入cassandra中,因此建议使用kafka consumer而不是f
我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?