当前位置: 首页 > 知识库问答 >
问题:

@KafkaListener并发多主题

韩安顺
2023-03-14

我想创建一个并发@KafkaListener,它可以处理多个主题,每个主题都有不同数量的分区。

我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。

示例:我已经将并发设置为8。我得到了一个听以下主题的@KafkaListener。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。

  • 主题A有5个分区

没有初始化更多消费者的技术原因是什么?

如何绕过这个问题,以便使用@KafkaListener注释初始化更多消费者?(如果可能的话)

共有1个答案

鄢修德
2023-03-14

当一个监听器被配置为监听多个主题时,每个消费者实例监听所有主题;Spring确实启动了8个消费者(在本例中),但这些分区在消费者之间的实际分布方式由Kafka的group management控制:

因此,在这种情况下,你会有3个空闲的消费者。

也许可以提供一个自定义的partition.assignment.strategy来按照您想要的方式进行分发,但是我从来没有研究过这个问题。

编辑

我刚刚测试了RoundRobinAssignor...

spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

 类似资料:
  • 我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。 我定义了bean of 和spring启动配置: 我看到所有配置都能正常工作,我在jmx中看到了我的10个线程: 但是我做了这样的测试: 如果版本是 也许我的期望不是真的,这是Kafka听众的正确行为。请

  • 下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?

  • 我使用spring-kafka注释@kafkalistener来指定我的侦听器方法。 我使用单个分区的单个主题。消息永远不会超过一秒或两秒,所以单个线程是可以接受的。spring-kafka文档称@Kafkalistener默认使用ConcurrentMessageListenerContainer。使用SetConcurrency控制并发的正确方法是吗?

  • ++++++++++++++++++++++++++++++++++++ 编辑 运行以下测试,每个主题都有一个分区。在测试运行之前,topic1中有10条消息,Topic2中有10条消息。运行代码并让10条topic1消息得到处理,但是当topic2消息得到处理时,我向topic1发送了更多的消息,但在处理完来自topic2的所有预先存在的消息之前,监听器没有处理这些消息。

  • 当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗? 下面是我的代码示例: 我的问题是,我想从主题之前阅读其他非Prio主题。只有当我的是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!

  • 总之,我试图“强制”RQ工作人员使用supervisord同时执行。我的setup supervisord设置似乎工作正常,因为rq仪表板显示了3个worker、3个PID和3个队列(每个worker/PID一个)。Supervisord设置如下(仅显示worker 1设置,在此设置下定义了另外两个worker): 问题是当我同时发送3个作业时,运行的总时间是单个任务的x3(即,总时间与任务数量呈