当前位置: 首页 > 知识库问答 >
问题:

阅读Kafka Listner Spring boot中的多个主题

阴元青
2023-03-14

我有两个不同的主题要阅读,并将处理后的数据发布到WebService。我有一个条件,我必须完全读取来自topic1的消息,并确保如果没有来自topic1的消息,我必须读取来自topic2的消息并处理它。如果我开始从topic2读取消息并从topic1获取消息,我必须暂停处理来自topic2的消息并从topic1读取消息。

我设法使用KafkalistenerEndpointRegistry实现了这一点。ListnerConfig代码是

@Bean(kafkaListenerContainerTopic1Factory)
 public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setBatchListener(true);
        return factory;
    }


    @Bean("kafkaListenerContainerTopic2Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets)  {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received first='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }





@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
    public void receiveRel(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received second='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @EventListener()
    public void eventHandler(ListenerContainerIdleEvent event) {
        LOG.info("Inside event");
        this.registry.getListenerContainer("second-listener").start();
    }

共有1个答案

祁和通
2023-03-14

据我所知,您希望将topic1的消息优先于Topic2。这不是直接可能与Kafka图书馆。你必须在Kafka上实现自定义包装来实现优先级。这里可以参考一个示例:https://github.com/flipkart-civitator/priority-kafka-client

 类似资料:
  • 问题内容: 我有一个InputStream,可以从中读取字符。我希望多个读者访问此InputStream。看来,实现此目标的合理方法是将传入数据写入StringBuffer或StringBuilder,并由多个读取器读取。不幸的是,不建议使用StringBufferInputStream。StringReader读取字符串,而不是不断更新的可变对象。我有什么选择?写我自己的? 问题答案: 输入流的

  • 我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。 如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。

  • 我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。

  • 我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到