在我的Spring启动kafka发布者应用程序中,我希望支持以字符串(json)或字节格式发布消息,因为我希望同时支持json和avro。但spring boot中的Kafka模板让我们只定义其中一个模板。有没有一种方法可以同时使用两个模板或任何其他方式来提供对json和avro的支持?
Kafka模板
您可以创建Kafka配置。我不得不将数据发送到两个不同的服务器。
@Configuration
public class KafkaConfig {
private final MosaicKafkaConfig mosaicKafkaConfig;
private final StreamKafkaConfig streamKafkaConfig;
public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
this.mosaicKafkaConfig = mosaicKafkaConfig;
this.streamKafkaConfig = streamKafkaConfig;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
return kafkaTemplate;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
return kafkaTemplate;
}
}
您可以尝试使用不同的配置创建KafkaTemboard:
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
return new KafkaTemplate<>(producerFactoryByte());
}
问题内容: 我的问题如标题中所述。我正在尝试做类似的事情: 我得到错误: 是否有其他方法可以在模板中进行模量计算? 问题答案: 添加具有所需逻辑的模板功能。例如: 游乐场的例子
我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗? 日志显示 KafkaTemplate 仅在我在 触发 方法后连接到 Kafka 主题:
我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。 但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的
问题内容: 我们可以在语句中多次使用语句吗?我的T-SQL脚本很长,我想在中运行它。如果一切顺利,那么我将承诺否则将回滚。 但是,在运行该查询时,出现类似的错误。当我在其中创建和删除许多功能和过程时。 我没有在脚本中的任何地方使用过。我的问题是- 我可以在该长脚本中使用多次语句。因为创建一个批处理,并且如果该批处理第一次成功执行但下次失败,那么该语句是否能够实际回滚已执行的语句? 我的脚本结构如下