当前位置: 首页 > 知识库问答 >
问题:

我们可以在Spring启动中使用多个Kafka模板吗?

艾浩穰
2023-03-14

在我的Spring启动kafka发布者应用程序中,我希望支持以字符串(json)或字节格式发布消息,因为我希望同时支持json和avro。但spring boot中的Kafka模板让我们只定义其中一个模板。有没有一种方法可以同时使用两个模板或任何其他方式来提供对json和avro的支持?

Kafka模板


共有2个答案

钱德元
2023-03-14

您可以创建Kafka配置。我不得不将数据发送到两个不同的服务器。

@Configuration
public class KafkaConfig {

    private final MosaicKafkaConfig mosaicKafkaConfig;
    private final StreamKafkaConfig streamKafkaConfig;

    public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
        this.mosaicKafkaConfig = mosaicKafkaConfig;
        this.streamKafkaConfig = streamKafkaConfig;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
        props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
        return kafkaTemplate;
    }


    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
        props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
        return kafkaTemplate;
    }
}
麻宾白
2023-03-14

您可以尝试使用不同的配置创建KafkaTemboard:

@Bean
public ProducerFactory<String, String> producerFactoryString() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters .... 
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters ....
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
    return new KafkaTemplate<>(producerFactoryByte());
}
 类似资料: