我使用spring-kafka注释@kafkalistener来指定我的侦听器方法。
我使用单个分区的单个主题。消息永远不会超过一秒或两秒,所以单个线程是可以接受的。spring-kafka文档称@Kafkalistener默认使用ConcurrentMessageListenerContainer。使用SetConcurrency控制并发的正确方法是吗?
@Bean("appContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
...
}
使用SetConcurrency控制并发的正确方法是吗?
您所拥有的是正确的,但是容器上的默认concurrency
是1,因此在不需要并发时不必指定它。
或者,我应该创建一个KafkamessageListenerContainer
,它是单线程的吗?
并发容器为每个并发
旋转一个“子”KafkamessageListenerContainer
,所以只为您创建了一个。
我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消
当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗? 下面是我的代码示例: 我的问题是,我想从主题之前阅读其他非Prio主题。只有当我的是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!
我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?
试图弄清楚我是否可以使用spring kafka和spring kafka测试为@KafkaListener编写单元测试。 我的听众课。 我的测试类别: 我的测试配置类: 有什么简单的方法可以做到这一点吗? 或者我应该以其他方式测试@KafkaListener?在单元测试中,如何确保在Kafka中收到新消息时调用@KafkaListener。
下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?
++++++++++++++++++++++++++++++++++++ 编辑 运行以下测试,每个主题都有一个分区。在测试运行之前,topic1中有10条消息,Topic2中有10条消息。运行代码并让10条topic1消息得到处理,但是当topic2消息得到处理时,我向topic1发送了更多的消息,但在处理完来自topic2的所有预先存在的消息之前,监听器没有处理这些消息。