下面有一个类似的问题:
一个Spring的Kafka消费者听众能听多个话题吗?
现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点:
如果你的属性是my.topics
(逗号分隔)...
@KafkaListener(id = "foo", topics = "#{'${my.topics}'.split(',')}")
是的,所有订阅的主题/分区都会管理偏移量。
我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消
++++++++++++++++++++++++++++++++++++ 编辑 运行以下测试,每个主题都有一个分区。在测试运行之前,topic1中有10条消息,Topic2中有10条消息。运行代码并让10条topic1消息得到处理,但是当topic2消息得到处理时,我向topic1发送了更多的消息,但在处理完来自topic2的所有预先存在的消息之前,监听器没有处理这些消息。
我正在为Kafka主题编写侦听器,并使用@kafkaListener注释来做同样的事情。到目前为止,我硬编码了主题名(topic1),效果很好。这是一个有效的代码:- } 现在,当我试图更改代码以从代码中获取主题名称时,它不起作用。代码,我试图从代码中获取值后,在堆栈溢出(如何将动态主题名称传递给@kafkalistener(主题从环境变量)是:- } 当我试图启动服务器时,它抛出错误:- bea
我有多个主题A、B和C,我正在使用Kafka Streams将它们扇形成主题X。主题A、B和C使用默认主题名称策略在模式注册表中注册为主题。流式传输非常愚蠢,因为它只是将消息扇形,而不确保它们符合注册表中的模式,但它会在消息中添加一个ORIGINAL_TOPIC_NAME标头以指示它来自主题A、B或C。 然后,我让Kafka消费者从主题X中消费。该主题未在模式注册表中注册。Kafka消费者是我使用
我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload
当我的@KafkaListener收听多个主题时,有人知道如何优先考虑单个Kafka主题吗? 下面是我的代码示例: 我的问题是,我想从主题之前阅读其他非Prio主题。只有当我的是空的,我才应该开始使用其他主题,而没有任何特定的顺序。任何提示/建议都很感激。谢谢你的帮助!