当前位置: 首页 > 知识库问答 >
问题:

使用者类侦听器方法未被触发以接收来自主题的消息。带Spring Boot应用程序的Kafka

田翔
2023-03-14
@Component
public class KafkaListenersExample {

    private final List<KafkaPayload> messages = new ArrayList<>();

    @KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
    public void listener(KafkaPayload data) {
        synchronized (messages){
            messages.add(data);
        }
        //System.out.println("message from kafka :"+data);
    }

    public List<KafkaPayload> getMessages(){
        return messages;
    }
}
@Configuration
class KafkaConsumerConfig {

    private String bootstrapServers = "localhost:9092";
 

    @Bean
    public ConsumerFactory<String, KafkaPayload> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props) ;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerConfigs());
        return factory;
    }
}

共有1个答案

景凌
2023-03-14

侦听器容器创建使用者、订阅并负责轮询

打开调试日志应该有助于确定哪里出了问题。

如果记录已经在主题中,则需要将consumerconfig.auto_offset_reset_config设置为aresty。否则,使用者将从主题的末尾开始消费(lates)。

 类似资料:
  • 这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。

  • 我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。

  • 我们正在使用sping-kafka-test-2.2.8-RELEASE。当我使用模板发送消息时,它会正确触发侦听器,但我无法在consumer.poll.中获取消息内容。如果我实例化KafkaTem板而不在类属性中“连接”它,并基于生产者工厂实例化它,它发送消息,但不触发@KafkaListener,只有在我在@Test method中设置消息监听器时才能工作。我需要触发kafka监听器,并意识

  • 我们面临着ActiveMQ及其消费者的随机问题。我们观察到,很少有消费者不接收消息,即使他们连接到ActiveMQ队列。但在消费者重启后,它工作正常。 我们在ActiveMQ端有一个名为testQueue的队列。消费者正试图将消息从该队列中解列。为此,我们正在使用Spring的DefaultMessageListenerContainer。消息正在从ActiveMQ代理传递到使用者节点。从tcpd

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 我刚开始使用ActiveMQ Artemis,并在我的机器上安装了Artemis2.17.0。创建了SpringBoot测试应用程序,其中存在JMS和MQTT发布者和接收者。还创建了小的RestController,这样我就可以使用JMS和MQTT生成器发送消息。接收器非常简单,只需创建一条日志消息到控制台。现在,当我使用MQTT生产者创建消息时,JMS和MQTT接收器都将消息获取并记录到控制台。