我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。
package com.bankia.apimanager.config;
import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, RequestDTO> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.bankia.apimanager.controller;
import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {
private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );
private static final String TOPIC = "test";
@Autowired
private KafkaTemplate<String, RequestDTO> sender;
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String postMessage(){
ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
@Override
public void onSuccess(SendResult<String, RequestDTO> result) {
LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
LOG.error("Unable to send message due to : " + ex.getMessage());
}
});
return "OK";
}
}
但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的Kafka模板
匿名用户
有两种情况:
场景#1
如果你想使用 KafkaTemplate 向 kafka 发送任何类型(如你的问题中所述),那么没有必要声明你自己的 KafkaTemplate bean,因为 Spring boot 在
KafkaAutoConfiguration
中为你做了这件事。
package org.springframework.boot.autoconfigure.kafka;
...
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
< code > * *一些注意事项**:
>
这个配置类已经用@ConditionalOnClass(KafkaTemplate.class)
注释,这意味着:(来自spring文档---
kafkaTemplate bean 方法用 @ConditionalOnMissingBean(KafkaTemplate.class)
注释,意思是:(来自 Spring docs ----
重要的在纯java世界中,KafkaTemplate
KafkaTemplate<?, ?> kf1 = ...;
KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
因为java参数化类型是不变的,如有效Java第三版第31项所述。但是spring world可以注入到你自己的服务中吗?您只需要在kafkaTemplate属性上指定自己的泛型类型。例如:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate1;
@Autowired
private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
}
场景#2
如果您需要限制kafka记录的值类型,那么您需要指定自己的kafka bean,如下所示:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(CorridorTracingConfiguration.class)
public class CorridorKafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
ProducerListener<Object, AbstractMessage> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
现在这只能注入到Kafka模板
示例用法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
public void makeTrx(TrxRequest trxRequest) {
kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
}
}
@Accessors(chain = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class FraudRequest extends AbstractMessage {
private float amount;
private String fromAccountNumber;
private String toAccountNumber;
...
}
要限制Kafka消息的键,请遵循相同的(上述)方式
引用您的代码:
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
请记住,泛型只保留到编译时(类型擦除)。
我认为,您可以指定一个通用的卡夫卡模板
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗? 日志显示 KafkaTemplate 仅在我在 触发 方法后连接到 Kafka 主题:
我一直收到一个错误,说这不是有效的json数据,并一直收到错误400。我认为我的实现没有正确格式化数据。目标:尝试使用rest模板进行post调用,将JSON数据作为主体传递。似乎从map转换为json数据时,它没有正确转换为json。
我使用spring rest模板作为请求发送json数组。发送请求的源代码如下: 并接受请求: 问题是它给了我以下错误:无法写入请求:找不到适合请求类型[org.json.JSONArray]的HttpMessageConverter。任何建议都是可以接受的。
我使用SpringKafka的KafkaTemplate以异步方式发送消息,并使用回调进行正确的错误处理。 此外,我已将 Kafka 生产者配置为具有最大重试次数 (MAX_INTEGER)。 然而,可能有一些与avro序列化相关的错误,但对于那些重试没有帮助。那么,我如何在不重试的情况下逃避这些错误,但对于其他与代理相关的问题,我想重试?
我正在尝试使用spring应用程序的Thymeleaf模板发送邮件,我在这里引用https://github.com/Thymeleaf/thymeleafexamples-springmail/ 我没有得到任何错误,但它仍然不工作...我使用相同的代码给在github仍然没有运气...谁能建议如何做到这一点?? 下面是用来发送邮件的方法。 如果我删除使用thymeleaf创建html正文的行,并
我正在测试Spring Kafka的示例代码。它适用于连接,但不适用于连接。 我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效: 但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。 应用程序.属性 有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?