当前位置: 首页 > 知识库问答 >
问题:

如何在spring boot中配置两个Kafka StreamsBuilderFactoryBean实例

马奇略
2023-03-14

使用spring-boot-2.1.3、spring-kafka-2.2.4,我希望有两个流配置(例如,拥有不同的application.id,或者连接到不同的集群,等等)。因此,我几乎根据文档定义了第一个流配置,然后添加了第二个流配置,使用了不同的名称,以及第二个StreamsBuilderFactoryBean(也使用了不同的名称):

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
        @Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
    return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}

然而,当我尝试运行该应用程序时,我得到:

在org.springframework.boot.autoconfigure.kafka.kafka.kafastreamsFactoryBeanConfigurer方法中的参数0需要一个bean,但找到了2个:&defaultkafastreamsanNotationDrivenConfiguration:由类路径资源[org/springframework/kafka/annotation/kafkastreamsBuilder]中的方法“defaultkafkastreamsBuilder”定义-&mykappconfigStreamsBuilder:由类路径资源

因为spring-boot autoconfigure中的代码:

@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
        StreamsBuilderFactoryBean factoryBean) {
    return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

除了完全替换KafkaStreamsAnnotationDrivenConfiguration之外,我如何定义多个StreamsBuilderFactoryBean。或者,如何更改给定流的属性?

共有1个答案

东方飞捷
2023-03-14

@primary标记一个工厂bean。

 类似资料: