当前位置: 首页 > 知识库问答 >
问题:

未调用Kafka侦听器方法。消费者不消费。

张毅
2023-03-14
<bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties">
    <constructor-arg type="java.lang.String" value="127.0.0.1:9092" />
    <constructor-arg type="java.lang.String" value="tdm-group" />
    <constructor-arg type="java.lang.String" value="dbStreamer.azuga.tripDriverMapping" />
</bean>
<bean id="kafkaListenerConfig" class="com.azuga.kafka.listeners.KafkaListenerConfig">
    <property name="kafkaConsumerProperties" ref="kafkaConsumerProperties" />
</bean>
<bean id="kafkaContainerFactory" class="com.azuga.kafka.listeners.KafkaListenerContainerFactory"
    factory-method="kafkaContainerFactory">
</bean>

这是创建ListenerContainerFactory的类

@EnableKafka
public class KafkaListenerContainerFactory {

public static ConcurrentKafkaListenerContainerFactory<String, String> kafkaContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

@SuppressWarnings("unchecked")
public static ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(KafkaListenerConfig.consumerProps(),
            KafkaListenerConfig.stringKeyDeserializer(), KafkaListenerConfig.stringKeyDeserializer());
}

}

这是我用@KafKalistener注释的Listener类

package com.azuga.kafka.listeners;

import org.springframework.kafka.annotation.KafkaListener;
public class Listener {

@KafkaListener(topics = "dbStreamer.azuga.tripDriverMapping")
public void onMessage(String message) {
    System.out.println(message.toString());
}
}

这是KafkaListenerConfig类,它接受引导服务器、主题名称等。

@EnableKafka
public class KafkaListenerConfig {

private static KafkaConsumerProperties kafkaConsumerProperties;

public void setKafkaConsumerProperties(KafkaConsumerProperties kafkaConsumerProperties) {
    this.kafkaConsumerProperties = kafkaConsumerProperties;
}

public static Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    return props;
}

public static Deserializer stringKeyDeserializer() {
    return new StringDeserializer();
}

}

共有1个答案

平羽
2023-03-14

您的应用程序的配置有点不寻常。

但是,我假设您忽略了@enablekafka是关于@configuration类的。因此,根据Spring Framework文档,您必须使用AnnotationConfigWebApplicationContext类:

* {@link org.springframework.web.context.WebApplicationContext WebApplicationContext}
 * implementation which accepts annotated classes as input - in particular
 * {@link org.springframework.context.annotation.Configuration @Configuration}-annotated
 * classes, but also plain {@link org.springframework.stereotype.Component @Component}
 * classes and JSR-330 compliant classes using {@code javax.inject} annotations. Allows
 * for registering classes one by one (specifying class names as config location) as well
 * as for classpath scanning (specifying base packages as config location).

不幸的是,这不能仅用于简单的XML配置。

Spring Kafka没有为XML定义提供任何钩子。

 类似资料:
  • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

  • 我试图构建一个简单的spring boot Kafka Consumer来消费来自Kafka主题的消息,但是由于KafkaListener方法没有被触发,所以没有任何消息被消费。 Java类: start.java: kafkaConsumerConfig.java:

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者