当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者在Spring Boot中未收到消息

濮冠宇
2023-03-14

我的Spring/java消费者无法访问生产者生成的消息。但是,当我从控制台/终端运行消费者时,它能够接收Spring/java生产者生成的消息。

消费者配置:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

监听器配置:

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer transactionJsonValueDeserializer() {
        return new JsonDeserializer(Transaction.class);
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Kafka听众:

@Service
public class TransactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) {
        LOGGER.info("transaction = {}",transaction);
    }
}

消费者应用:

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

测试用例1:通过 我启动了我的Spring/ java生产者并从控制台运行消费者。当我生成消息表单创建者时,我的控制台使用者能够访问该消息。

测试用例2:失败:我启动了spring/java消费者并从控制台运行生产者。当我从控制台生成器生成消息时,我的spring/java消费者无法访问消息。

测试用例3:失败我启动了我的spring/java消费者并运行了spring/java生产者。当我从spring/java生成器生成消息时,我的spring/java消费者无法访问该消息。

问题

>

  • 我的消费者代码有什么问题吗?

    我的kafka-listener是否缺少任何配置?

    是否需要显式运行侦听器?(我不这么认为,因为我可以在终端日志上看到连接到主题,但我仍然不确定)

  • 共有1个答案

    呼延源
    2023-03-14

    好的,您缺少AUTO_OFFSET_RESET_CONFIG中的消费者配置

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    

    自动偏移复位

    当Kafka中没有初始偏移或服务器上不再存在当前偏移时(例如,因为数据已被删除),该怎么办:

    最早:自动将偏移量重置为最早偏移量

    最新:自动将偏移量重置为最新偏移量

    无:如果没有为消费者的组找到以前的偏移量,则向消费者抛出异常

    其他任何东西:向消费者抛出异常

    注意:<代码>自动偏移。仅当kafka没有该消费者组的偏移量时,将重置为<code>最早。

     类似资料:
    • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

    • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

    • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

    • 我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?

    • 我的问题是关于Kafka在爪哇的消费者 > 已启动Kafka服务器 创建的主题 创作者 创建的消费者 我在终端中做的所有这些事情,工作正常,能够在消费者处正确接收日志。运行下面的消费者(在Java),但没有收到任何记录。它继续汇集在 也没有收到任何记录。 下面给出的我的 java 消费者代码 请告诉我在java消费者类中接收消息,我在配置中做错了什么吗?属性中的“group.id”是怎么回事? 下

    • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是