当前位置: 首页 > 知识库问答 >
问题:

一个Spring的Kafka消费者听众可以收听多个主题吗?

郎子平
2023-03-14

有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助!

@KafkaListener(topics = "topic1,topic2")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    System.out.println(record);
} 

或者

ContainerProperties containerProps = new ContainerProperties(new TopicPartitionInitialOffset("topic1, topic2", 0));

共有1个答案

郑俊材
2023-03-14

是的,只需遵循@KafkaListenerJavaDocs:

/**
 * The topics for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic name.
 * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
 * @return the topic names or expressions (SpEL) to listen to.
 */
String[] topics() default {};

/**
 * The topic pattern for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic pattern.
 * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
 * @return the topic pattern or expression (SpEL).
 */
String topicPattern() default "";

/**
 * The topicPartitions for this listener.
 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
 * @return the topic names or expressions (SpEL) to listen to.
 */
TopicPartition[] topicPartitions() default {};

因此,您的用例应该是:

@KafkaListener(topics = {"topic1" , "topic2"})
 类似资料:
  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 当有多个消费者时,我无法听到kafka主题(我的案例2主题)。在下面的例子中,我有2个消费者工厂,它将接受2个不同的JSON消息(一个是用户类型,另一个是事件类型)。这两条消息都发布到不同的主题。在这里,当我试图从topic1访问事件消息时,我无法访问,但我可以访问用户主题消息。 例如: 我的主要应用如下: 我的需要是首先收听事件主题并对消息进行一些按摩,然后向其发送用户主题,我有另一种方法将收听

  • 有什么不同吗?术语KafkaConsumer和KafkaListener可以互换使用吗?

  • 所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 我有两个kafka consumer实例,配置了相同的消费者组,并监听相同主题中的分区0。问题是我发消息到题目的时候。消息由两个实例使用,这两个实例应该不会发生,因为它们在同一个组中。我使用Spring Boot配置类来配置它们。 以下是配置: 以下是听众: