当前位置: 首页 > 知识库问答 >
问题:

当KafkaListener监听多个主题时,您可以优先考虑单个主题吗?

端木宏盛
2023-03-14

当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗?

下面是我的代码示例:

@KafkaListener(id = "priority", topics = { "${prio-topic}" }, concurrency = "1", autoStartup = "true")
    @KafkaListener(id = "nonPriority", topics = { "${not-prio-topic-1}",
            "${not-prio-topic-2}", "${not-prio-topic-3}",
            "${not-prio-topic-4}", concurrency = "1", autoStartup = "true")
    public synchronized void  listenManyEntryTopic(String message) {}

我的问题是,我想从主题prio主题之前阅读其他非Prio主题。只有当我的prio主题是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!

共有2个答案

姜献
2023-03-14

因为有两个侦听器容器,所以可以将@Header(KafkaHeaders.RECEIVED_TOPIC)字符串TOPIC作为参数添加到方法中。

然后,当您收到来自优先级主题的消息时,您可以stop()另一个侦听器容器(使用KafkaListenerEndpointRegistrybean),如果它正在运行。

在主主题容器中配置idleEventInterval,并添加一个@EventListner方法(或一个Application ationListenerbean)。

然后,当检测到空闲的主容器时,可以重新启动非主容器。

编辑

@SpringBootApplication
public class So66366140Application {

    private static final Logger LOG = LoggerFactory.getLogger(So66366140Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So66366140Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so66366140-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66366140-2").partitions(1).replicas(1).build();
    }

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @KafkaListener(id = "so66366140-1", topics = "so66366140-1")
    @KafkaListener(id = "so66366140-2", topics = "so66366140-2", autoStartup = "false")
    public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        LOG.info(in);
        if (topic.equals("so66366140-1")
                && this.registry.getListenerContainer("so66366140-2").isRunning()) {
            LOG.info("Stopping non-pri container");
            this.registry.getListenerContainer("so66366140-2").stop();
        }
    }

    @EventListener
    void events(ListenerContainerIdleEvent event) {
        LOG.info(event.toString());
        if (event.getListenerId().startsWith("so66366140-1")
                && !this.registry.getListenerContainer("so66366140-2").isRunning()) {
            LOG.info("Starting non-pri container");
            this.registry.getListenerContainer("so66366140-2").start();
        }
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0,  10).forEach(i -> {
                template.send("so66366140-1", "foo");
                template.send("so66366140-2", "bar");
                try {
                    Thread.sleep(6_000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        };
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5s

你也可以用暂停/恢复代替停止/开始。

司徒瀚
2023-03-14

Kafka中没有区分优先级和非优先级主题消息的功能。为了解决您的问题,其中一个解决方案是将优先级处理主题与非优先级主题分开,即专用app1只使用优先级主题消息并对其进行处理,同时app2使用非优先级消息并对其进行处理。

 类似资料:
  • 我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消

  • 我使用spring-kafka注释@kafkalistener来指定我的侦听器方法。 我使用单个分区的单个主题。消息永远不会超过一秒或两秒,所以单个线程是可以接受的。spring-kafka文档称@Kafkalistener默认使用ConcurrentMessageListenerContainer。使用SetConcurrency控制并发的正确方法是吗?

  • 有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者

  • 我刚刚开始阅读关于潜在Dirichlet分配LDA的文章,并希望将其应用到我的项目中。 我可以知道LDA是否能将一个主题分配给多个单词吗? 例如,A条谈到“河岸”,而B条谈到“银行在金融中的作用”。因此,LDA是否允许将“银行”一词潜在地分配给两个不同的主题?

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?