当前位置: 首页 > 知识库问答 >
问题:

Spring启动kafka通用JSON模板发送器

云瑞
2023-03-14

我正在使用 kafka 和 spring boot,我需要将 JSON 对象发送到 kafka,关键是我能够将一个对象作为配置 KafkaTemplate 的 JSON 发送,但仅适用于此对象。

package com.bankia.apimanager.config;

import com.bankia.apimanager.model.RequestDTO;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, RequestDTO> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, RequestDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
package com.bankia.apimanager.controller;

import com.bankia.apimanager.model.RequestDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/infrastructure")
public class InfraStructureRequestController {

    private final static Logger LOG = LoggerFactory.getLogger( InfraStructureRequestController.class );

    private static final String TOPIC = "test";

    @Autowired
    private KafkaTemplate<String, RequestDTO> sender;

    @RequestMapping(value = "/test", method = RequestMethod.GET)
    public String postMessage(){

        ListenableFuture<SendResult<String, RequestDTO>> future = sender.send(TOPIC, new RequestDTO("Hola","Paco"));
        future.addCallback(new ListenableFutureCallback<SendResult<String, RequestDTO>>() {
            @Override
            public void onSuccess(SendResult<String, RequestDTO> result) {
                LOG.info("Sent message with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Unable to send message due to : " + ex.getMessage());
            }
        });
        return "OK";
    }
}

但是如果现在我想发送一个新的DTO对象呢?我是否必须声明一个新的Kafka模板


共有3个答案

史商震
2023-03-14
匿名用户

有两种情况:

场景#1

如果你想使用 KafkaTemplate 向 kafka 发送任何类型(如你的问题中所述),那么没有必要声明你自己的 KafkaTemplate bean,因为 Spring boot 在 KafkaAutoConfiguration 中为你做了这件事。

package org.springframework.boot.autoconfigure.kafka;

...

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }
}


< code > * *一些注意事项**:

>

  • 这个配置类已经用@ConditionalOnClass(KafkaTemplate.class)注释,这意味着:(来自spring文档---

    kafkaTemplate bean 方法用 @ConditionalOnMissingBean(KafkaTemplate.class) 注释,意思是:(来自 Spring docs ----

    重要的在纯java世界中,KafkaTemplate

    KafkaTemplate<?, ?> kf1 = ...;
    KafkaTemplate<String, RequestDTO> kf2 = kf1; // Compile time error
    

    因为java参数化类型是不变的,如有效Java第三版第31项所述。但是spring world可以注入到你自己的服务中吗?您只需要在kafkaTemplate属性上指定自己的泛型类型。例如:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaService {
        @Autowired
        private KafkaTemplate<Integer, String> kafkaTemplate1;
        @Autowired
        private KafkaTemplate<Integer, RequestDTO> KafkaTemplate2;
    }
    
    

    场景#2

    如果您需要限制kafka记录的值类型,那么您需要指定自己的kafka bean,如下所示:

    
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(KafkaTemplate.class)
    @EnableConfigurationProperties(CorridorTracingConfiguration.class)
    public class CorridorKafkaAutoConfiguration {
        
        @Bean
        @ConditionalOnMissingBean(KafkaTemplate.class)
        public KafkaTemplate<?, AbstractMessage> kafkaTemplate(ProducerFactory<Object, AbstractMessage> kafkaProducerFactory,
                                                                  ProducerListener<Object, AbstractMessage> kafkaProducerListener,
                                                                  ObjectProvider<RecordMessageConverter> messageConverter) {
            KafkaTemplate<Object, AbstractMessage> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
            messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
            kafkaTemplate.setProducerListener(kafkaProducerListener);
            kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
            return kafkaTemplate;
        }
    
    

    现在这只能注入到Kafka模板

    示例用法:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaService {
        @Autowired
        private KafkaTemplate<String, AbstractMessage> kafkaTemplate;
    
        public void makeTrx(TrxRequest trxRequest) {
            kafkaTemplate.send("fraud-request", trxRequest.fromAccountNumber(), new FraudRequest(trxRequest));
        }
    
    
    }
    
    @Accessors(chain = true)
    @Getter
    @Setter
    @EqualsAndHashCode(callSuper = true)
    @ToString(callSuper = true)
    public class FraudRequest extends AbstractMessage {
        private float amount;
        private String fromAccountNumber;
        private String toAccountNumber;
    
    ...
    
    }
    
    

    要限制Kafka消息的键,请遵循相同的(上述)方式

  • 宇文元明
    2023-03-14

    引用您的代码:

    • Value Serializer被正确定义为JsonSerializer,它将任何类型的对象转换为JSON
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }
    
    • 改变

    请记住,泛型只保留到编译时(类型擦除)。

    沈伟
    2023-03-14

    我认为,您可以指定一个通用的卡夫卡模板

    @Configuration
    public class KafkaConfiguration {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
    }
    

     类似资料:
    • 我实现了一个使用Spring Kafka的基本Spring Boot应用程序。我希望我的制作人在第一个<代码>之前连接到Kafka主题。send()被调用,但我找不到这样做的方法。这可能吗? 日志显示 KafkaTemplate 仅在我在 触发 方法后连接到 Kafka 主题:

    • 我一直收到一个错误,说这不是有效的json数据,并一直收到错误400。我认为我的实现没有正确格式化数据。目标:尝试使用rest模板进行post调用,将JSON数据作为主体传递。似乎从map转换为json数据时,它没有正确转换为json。

    • 我使用spring rest模板作为请求发送json数组。发送请求的源代码如下: 并接受请求: 问题是它给了我以下错误:无法写入请求:找不到适合请求类型[org.json.JSONArray]的HttpMessageConverter。任何建议都是可以接受的。

    • 我使用SpringKafka的KafkaTemplate以异步方式发送消息,并使用回调进行正确的错误处理。 此外,我已将 Kafka 生产者配置为具有最大重试次数 (MAX_INTEGER)。 然而,可能有一些与avro序列化相关的错误,但对于那些重试没有帮助。那么,我如何在不重试的情况下逃避这些错误,但对于其他与代理相关的问题,我想重试?

    • 我正在尝试使用spring应用程序的Thymeleaf模板发送邮件,我在这里引用https://github.com/Thymeleaf/thymeleafexamples-springmail/ 我没有得到任何错误,但它仍然不工作...我使用相同的代码给在github仍然没有运气...谁能建议如何做到这一点?? 下面是用来发送邮件的方法。 如果我删除使用thymeleaf创建html正文的行,并

    • 我正在测试Spring Kafka的示例代码。它适用于连接,但不适用于连接。 我已通过成功运行控制台使用者来验证密钥和证书对 kafka 代理有效: 但是我不能使用Spring Boot(2.0.1.RELEASE)和Spring Kafka,使用相同的密钥和证书发送消息。 应用程序.属性 有人成功用SSL配置Spring Boot 2.0 Spring Kafka吗?