@Component
public class KafkaListenersExample {
private final List<KafkaPayload> messages = new ArrayList<>();
@KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
public void listener(KafkaPayload data) {
synchronized (messages){
messages.add(data);
}
//System.out.println("message from kafka :"+data);
}
public List<KafkaPayload> getMessages(){
return messages;
}
}
@Configuration
class KafkaConsumerConfig {
private String bootstrapServers = "localhost:9092";
@Bean
public ConsumerFactory<String, KafkaPayload> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props) ;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerConfigs());
return factory;
}
}
侦听器容器创建使用者、订阅并负责轮询。
打开调试日志应该有助于确定哪里出了问题。
如果记录已经在主题中,则需要将consumerconfig.auto_offset_reset_config
设置为aresty
。否则,使用者将从主题的末尾开始消费(lates
)。
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。
我们正在使用sping-kafka-test-2.2.8-RELEASE。当我使用模板发送消息时,它会正确触发侦听器,但我无法在consumer.poll.中获取消息内容。如果我实例化KafkaTem板而不在类属性中“连接”它,并基于生产者工厂实例化它,它发送消息,但不触发@KafkaListener,只有在我在@Test method中设置消息监听器时才能工作。我需要触发kafka监听器,并意识
我们面临着ActiveMQ及其消费者的随机问题。我们观察到,很少有消费者不接收消息,即使他们连接到ActiveMQ队列。但在消费者重启后,它工作正常。 我们在ActiveMQ端有一个名为testQueue的队列。消费者正试图将消息从该队列中解列。为此,我们正在使用Spring的DefaultMessageListenerContainer。消息正在从ActiveMQ代理传递到使用者节点。从tcpd
我刚开始使用ActiveMQ Artemis,并在我的机器上安装了Artemis2.17.0。创建了SpringBoot测试应用程序,其中存在JMS和MQTT发布者和接收者。还创建了小的RestController,这样我就可以使用JMS和MQTT生成器发送消息。接收器非常简单,只需创建一条日志消息到控制台。现在,当我使用MQTT生产者创建消息时,JMS和MQTT接收器都将消息获取并记录到控制台。