我想创建一个并发的@KafkaListener
,它可以处理多个主题,每个主题都有不同数量的分区。
我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。
示例:我已经将并发设置为8。我得到了一个听以下主题的@KafkaListener
。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。
没有初始化更多消费者的技术原因是什么?
如何绕过这个问题,以便使用@KafkaListener
注释初始化更多消费者?(如果可能的话)
当一个监听器被配置为监听多个主题时,每个消费者实例监听所有主题;Spring确实启动了8个消费者(在本例中),但这些分区在消费者之间的实际分布方式由Kafka的group management控制:
因此,在这种情况下,你会有3个空闲的消费者。
也许可以提供一个自定义的partition.assignment.strategy
来按照您想要的方式进行分发,但是我从来没有研究过这个问题。
编辑
我刚刚测试了RoundRobinAssignor
...
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
和
我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。 我定义了bean of 和spring启动配置: 我看到所有配置都能正常工作,我在jmx中看到了我的10个线程: 但是我做了这样的测试: 如果版本是 也许我的期望不是真的,这是Kafka听众的正确行为。请
下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?
我使用spring-kafka注释@kafkalistener来指定我的侦听器方法。 我使用单个分区的单个主题。消息永远不会超过一秒或两秒,所以单个线程是可以接受的。spring-kafka文档称@Kafkalistener默认使用ConcurrentMessageListenerContainer。使用SetConcurrency控制并发的正确方法是吗?
++++++++++++++++++++++++++++++++++++ 编辑 运行以下测试,每个主题都有一个分区。在测试运行之前,topic1中有10条消息,Topic2中有10条消息。运行代码并让10条topic1消息得到处理,但是当topic2消息得到处理时,我向topic1发送了更多的消息,但在处理完来自topic2的所有预先存在的消息之前,监听器没有处理这些消息。
当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗? 下面是我的代码示例: 我的问题是,我想从主题之前阅读其他非Prio主题。只有当我的是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!
总之,我试图“强制”RQ工作人员使用supervisord同时执行。我的setup supervisord设置似乎工作正常,因为rq仪表板显示了3个worker、3个PID和3个队列(每个worker/PID一个)。Supervisord设置如下(仅显示worker 1设置,在此设置下定义了另外两个worker): 问题是当我同时发送3个作业时,运行的总时间是单个任务的x3(即,总时间与任务数量呈