当前位置: 首页 > 知识库问答 >
问题:

显式启动Kafka消费者在主后方法运行

汪才
2023-03-14

我有一个spring boot服务,它使用Kafka主题。当我消费时,我会对Kafka信息执行某些任务。在执行这些操作之前,我需要等待服务将一些数据加载到我设置的缓存中。我的问题是,如果我将kafka consumer设置为autostart,它将在缓存加载并出错之前开始使用。

我试图在加载缓存后显式启动使用者,但却出现空指针异常

@Configuration
public class KafkaConfig {

    @Value("${kafka.server}")
    String server;

    @Value("${kafka.port}")
    String port;

    @Value("${kafka.group.id}")
    String groupid;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server+":"+port);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // config.put("security.protocol","SASL_PLAINTEXT");
        // config.put("sasl.kerberos.service.name","kafka");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setAutoStartup(false);
        return factory;
    }

}

Kafka利斯泰纳

    @Service
    public class KafkaConsumer {

    @Autowired
    AggregationService aggregationService;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    private final CounterService counterService;

    public KafkaConsumer(CounterService counterService) {
        this.counterService = counterService;
    }

    @KafkaListener(topics = "gliTransactionTopic", group = "gliDecoupling", id = "gliKafkaListener")
    public boolean consume(String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
            @Header(KafkaHeaders.OFFSET) Long offset) throws ParseException {
        System.out.println("Inside kafka listener :" + message+" partition :"+partition.toString()+" offset :"+offset.toString());
    aggregationService.run();
            return true;
        }



}

要启动和停止的服务

    @Service
    public class DecouplingController {

        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;



        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.stop();
        }

        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry
                    .getListenerContainer("gliKafkaListener");
            listenerContainer.start();
        }


    }

主要方法

    @SpringBootApplication
    public class DecouplingApplication {

        Ignite ignite;


        static IgniteCache<Long, MappingsEntity> mappingsCache;

        public static void main(String[] args) {
            SpringApplication.run(DecouplingApplication.class, args);

            Ignition.setClientMode(true);
            Ignite ignite = Ignition.ignite("ignite");
            loadCaches(ignite);




        }

        public static boolean loadCaches(Ignite ignite) {

            mappingsCache = ignite.getOrCreateCache("MappingsCache");


            mappingsCache.loadCache(null);

            System.out.println("Data Loaded");

            DecouplingController dc=new DecouplingController();
            dc.start();

            return true;
        }

    }

以下是例外情况

    Data Loaded
    Exception in thread "main" java.lang.NullPointerException
        at com.ignite.spring.decoupling.controller.DecouplingController.start(DecouplingController.java:126)
        at com.ignite.spring.decoupling.DecouplingApplication.loadCaches(DecouplingApplication.java:64)
        at com.ignite.spring.decoupling.DecouplingApplication.main(DecouplingApplication.java:37)

共有2个答案

堵飞鸿
2023-03-14

似乎gliKafkaListener1未在消费者配置/ListenerConfig的某些部分注册

陆弘光
2023-03-14

不要手动创建DecouplingController的对象,而是在DecouplingApplication中自动连接依赖项。

@Autowired

DecouplingController deDecouplingController;

处理自动连线依赖关系的应用程序上下文(ApplicationContext)不知道您使用“new”手动创建的对象。自动连线的kafkaListenerEndpointRegistry对于您创建的新解耦控制器对象是未知的。

 类似资料:
  • 我有一个使用Spring kafka库的Spring启动应用程序的消费者。我想为每个消费者线程设置租户上下文,即在创建每个线程时调用一个方法(创建时每个线程只调用一次)。目前,我已将其添加到listner中,其中对方法有@KafkaListner注释,但它每次轮询并处理每条记录时都会调用它。我想在消费者线程启动时调用此方法一次。如果我们有任何这样的事情,请在这里帮助我。

  • 谁能建议在spring启动其kafka消费者之前如何运行初始化我的应用程序的方法?我正在使用spring的@KafkaListener注释创建一个kafka消费者

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 主要内容:1 创建DefaultMQPushConsumer实例,2 subscribe订阅,3 start启动消费者,3.1 copySubscription拷贝订阅关系,4 小结基于RocketMQ release-4.9.3,深入的介绍了消费者DefaultMQPushConsumer启动主要流程源码。 此前我们学习了Producer和Broker的启动源码,以及Producer发送消息源码和Broker接收存储消息的源码,现在,我们来学习Consumer的启动以及消费消息的源码。Cons