当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka的Spring Boot:未读取消息

谢麒
2023-03-14

我目前正在使用Kafka listener设置Spring Boot应用程序。我试图只对消费者进行编码。对于producer,我现在正在从Kafka控制台手动发送消息。我举了一个例子:http://www.source4code.info/2016/09/spring-kafka-consumer-producer-example.html

我尝试将其作为Spring Boot应用程序运行,但无法看到收到任何消息。我的本地主题Kafka中已经有一些消息

C:\software\kafka_2.11-0.10.1.0\kafka_2.11-0.10.1.0\kafka_2.11-0.10.1.0\bin\wind
ows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
this is a message
testing again

我的Spring Boot应用程序是:

@EnableDiscoveryClient
@SpringBootApplication
public class KafkaApplication {

    /**
     * Run the application using Spring Boot and an embedded servlet engine.
     * 
     * @param args
     *            Program arguments - ignored.
     */
    public static void main(String[] args) {
        // Tell server to look for registration.properties or registration.yml
        System.setProperty("spring.config.name", "kafka-server");

        SpringApplication.run(KafkaApplication.class, args);
    }
}

Kafka的配置是:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        //factory.setConcurrency(1);
        //factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

Kafka的听众是:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class Listener {

    protected Logger logger = Logger.getLogger(Listener.class
            .getName());

    public CountDownLatch getCountDownLatch1() {
        return countDownLatch1;
    }

    private CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("Received message: " + record);
        System.out.println("Received message: " + record);
        countDownLatch1.countDown();
    }
}

我是第一次尝试这个。如果我做错了什么,请告诉我。任何帮助都将不胜感激。

共有3个答案

韶景曜
2023-03-14

您需要使用服务或组件对侦听器类进行注释,以便Spring Boot可以加载Kafka侦听器。

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

@Component
public class Listener {

    protected Logger logger = Logger.getLogger(Listener.class
            .getName());

    public CountDownLatch getCountDownLatch1() {
        return countDownLatch1;
    }

    private CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "test")
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("Received message: " + record);
        System.out.println("Received message: " + record);
        countDownLatch1.countDown();
    }
}
孙辰阳
2023-03-14

观察到您对消费者组进行了dit注释。id属性。

 //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

让我们看看Kafka官方文件是如何引用的:

标识此消费者所属的消费者组的唯一字符串。如果消费者使用订阅(主题)或基于Kafka的偏移管理策略使用组管理功能,则需要此属性。

试图取消对该行的注释,消费者成功了。

郏博瀚
2023-03-14

您没有设置消费者配置。AUTO_OFFSET_RESET_CONFIG所以默认为“最新”。将其设置为“最早”,这样消费者就会收到主题中已经有的消息。

消费者配置。AUTO\u OFFSET\u RESET\u CONFIG仅在使用者组没有主题分区的偏移量时生效。如果已使用“最新”设置运行消费者,则使用其他设置再次运行消费者不会更改偏移量。消费者必须使用不同的组,以便Kafka为该组指定偏移。

 类似资料:
  • 该未读消息接口记录不含在通知内的消息,如当前用户收到的评论、点赞和未处理的审核等,调用相应的列表和操作接口,讲自动清零 GET /user/unread-count Response Status: 200 OK { "counts": { "user_id": 1, "unread_comments_count": 0, "unread_likes_count":

  • 我刚开始穿弹力靴。这就是我要解决的问题。我有一个application.yml文件,它具有以下属性: 我尝试使用以下代码访问KinesisSenderFeature的值: 以及 PropertySourcesPlaceHolderConfigureer bean定义为: 是的,我确实看到了: 将Yaml中的列表映射到Spring Boot中的对象列表 Spring boot YAML配置未读取所有

  • 1.未读消息的监听 未读消息的监听用于提示访客有未读消息,用户可以在咨询入口处添加消息数小红点时使用; (新接口)未读消息的监听接口为OnUnreadNMsgListener;使用新接口的不用再使用下面的老接口。 注册监听方法:NSDKMsgUtils.getInstance().setOnUnreadNMsgListener(OnUnreadNMsgListener onUnreadNmsgLi

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 在SpringBoot中读取环境变量的最佳方法是什么? 在Java中,我使用了以下方法: 是否可以使用注释来实现?

  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制