当前位置: 首页 > 知识库问答 >
问题:

Kafka主题的优先排序

锺离飞飙
2023-03-14
@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic1Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean("kafkaListenerContainerTopic2Factory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }

}

Listner代码

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets)  {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received first='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
    public void receiveRel(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
        for (int i = 0; i < messages.size(); i++) {
            LOG.info("received second='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));
        }
    }

    @EventListener()
    public void eventHandler(ListenerContainerIdleEvent event) {
        LOG.info("Inside event");
        this.registry.getListenerContainer("second-listener").start();
    }

请帮助我解决问题,因为这样的循环每天都会发生。完全读取topic1消息,然后从Topic2读取消息。

共有1个答案

程俊力
2023-03-14

您已经在使用一个空闲的事件侦听器来启动第二个侦听器--它也应该停止第一个侦听器。

当第二个监听器空闲时;别闹了。

您应该检查事件用于哪个容器,以决定停止和/或启动哪个容器。

 类似资料:
  • 我正在使用Kafka Consumer阅读多个主题,我需要其中一个具有更高优先级。处理需要很多时间,而且(低优先级)主题中总是有很多消息,但我需要尽快处理来自另一个主题的消息。 这和Kafka是否支持主题或消息的优先级类似?但这一个使用的是旧的API。 在新的API(0.10.1.1)中,有一些方法 但我不清楚,如何有效地检测高优先级主题中有新消息,有必要暂停其他主题的消费。 有什么想法/例子吗?

  • 我正在研究Kafka是否支持任何要处理的队列或消息的优先级。 看来它不支持任何这样的事情。我在谷歌上找到了这个也支持这一点的邮件存档:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOEJIJHVHSR=d6astihpsqwvg6vk5xylam6ymdcd6uauoxf-dq@mai

  • 问题内容: 默认情况下,优先级队列的元素如何按照自然顺序排序,因为它没有实现可比的接口。 从文档中可以看出,元素是根据自然顺序进行排序的,但是我找不到任何关于equals方法或可比性的东西。它在内部如何发生? 所有实现的接口:可序列化,可迭代,集合,队列。 如果它实现可比性,那么为什么不在上一行说 例: 第三条打印语句也将打印[1、3、2、4],而不是打印[1、2、3、4]。为什么?应该自然排序吧

  • 我有一个,名为,其中包含类型的对象。 您可以在所有车辆上调用该方法。 我要做的是排序,这样车辆被赋予更高的优先级,并被放在队列的前面。 我假设我必须在这里使用一个比较器,但不知道怎么做。

  • 非常感谢您抽出时间!

  • 假设我有三个指数:城市、博物馆和景点。 现在我正在查询一个术语的所有索引(),例如“维也纳” 作为结果,我得到: 维也纳:维也纳艺术博物馆 有没有办法优先考虑指数,这样我就可以得到第一个城市,而不是景点,最后是博物馆,就像这样: 维也纳 维也纳的Riesenrad 维也纳:维也纳艺术博物馆 维也纳:维也纳历史博物馆