当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗?
下面是我的代码示例:
@KafkaListener(id = "priority", topics = { "${prio-topic}" }, concurrency = "1", autoStartup = "true")
@KafkaListener(id = "nonPriority", topics = { "${not-prio-topic-1}",
"${not-prio-topic-2}", "${not-prio-topic-3}",
"${not-prio-topic-4}", concurrency = "1", autoStartup = "true")
public synchronized void listenManyEntryTopic(String message) {}
我的问题是,我想从主题prio主题
之前阅读其他非Prio主题。只有当我的prio主题
是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!
因为有两个侦听器容器,所以可以将@Header(KafkaHeaders.RECEIVED_TOPIC)字符串TOPIC
作为参数添加到方法中。
然后,当您收到来自优先级主题的消息时,您可以stop()
另一个侦听器容器(使用KafkaListenerEndpointRegistry
bean),如果它正在运行。
在主主题容器中配置idleEventInterval
,并添加一个@EventListner
方法(或一个Application ationListener
bean)。
然后,当检测到空闲的主容器时,可以重新启动非主容器。
编辑
@SpringBootApplication
public class So66366140Application {
private static final Logger LOG = LoggerFactory.getLogger(So66366140Application.class);
public static void main(String[] args) {
SpringApplication.run(So66366140Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so66366140-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66366140-2").partitions(1).replicas(1).build();
}
@Autowired
KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "so66366140-1", topics = "so66366140-1")
@KafkaListener(id = "so66366140-2", topics = "so66366140-2", autoStartup = "false")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOG.info(in);
if (topic.equals("so66366140-1")
&& this.registry.getListenerContainer("so66366140-2").isRunning()) {
LOG.info("Stopping non-pri container");
this.registry.getListenerContainer("so66366140-2").stop();
}
}
@EventListener
void events(ListenerContainerIdleEvent event) {
LOG.info(event.toString());
if (event.getListenerId().startsWith("so66366140-1")
&& !this.registry.getListenerContainer("so66366140-2").isRunning()) {
LOG.info("Starting non-pri container");
this.registry.getListenerContainer("so66366140-2").start();
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
template.send("so66366140-1", "foo");
template.send("so66366140-2", "bar");
try {
Thread.sleep(6_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5s
你也可以用暂停/恢复代替停止/开始。
Kafka中没有区分优先级和非优先级主题消息的功能。为了解决您的问题,其中一个解决方案是将优先级处理主题与非优先级主题分开,即专用app1只使用优先级主题消息并对其进行处理,同时app2使用非优先级消息并对其进行处理。
我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消
我使用spring-kafka注释@kafkalistener来指定我的侦听器方法。 我使用单个分区的单个主题。消息永远不会超过一秒或两秒,所以单个线程是可以接受的。spring-kafka文档称@Kafkalistener默认使用ConcurrentMessageListenerContainer。使用SetConcurrency控制并发的正确方法是吗?
有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者
我刚刚开始阅读关于潜在Dirichlet分配LDA的文章,并希望将其应用到我的项目中。 我可以知道LDA是否能将一个主题分配给多个单词吗? 例如,A条谈到“河岸”,而B条谈到“银行在金融中的作用”。因此,LDA是否允许将“银行”一词潜在地分配给两个不同的主题?
我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。
下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?