当前位置: 首页 > 知识库问答 >
问题:

在Spring中向Apache Kafka主题发送json有效负载

卫彭亮
2023-03-14
public class User {
    private String firstname;
    private String email;


    public User() {}
    public User(String firstname, String email) {
        super();
        this.firstname = firstname;
        this.email = email;
    }
    public String getFirstname() {
        return firstname;
    }
    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }
    public String getEmail() {
        return email;
    }
    public void setEmail(String email) {
        this.email = email;
    }
    @Override
    public String toString() {
        return "UserModel [firstname=" + firstname + ", email=" + email + "]";
    }


}
 package config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import com.xyz.User;

@Configuration
public class KafkaConfiguration {

    @Bean
    public ProducerFactory<String, User> producerFactory(){
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);

    }
     @Bean
        public KafkaTemplate<String, User> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }

}

控制器

@RestController
@RequestMapping("/kafka")
public class UserController {
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    private static String TOPIC = "kafka-producer";
    @PostMapping("/publish")
    public void getUserId(@RequestBody User user) {

        kafkaTemplate.send(TOPIC, user);

Json有效负载从邮递员发送

{
    "firstname" : "xyz",
    "email" : "xyz@gmail.com.com"

}

共有1个答案

慕阳平
2023-03-14

能够在我的本地设置中重现您的错误。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.example.demo.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: com.example.demo.User cannot be cast to java.lang.String

如果我们查看log,那么它显示value.serializervalue仍然引用默认的StringSerializer而不是预期的JSONSerializer,这反过来表示您的生产者配置没有生效。简而言之,您的kafkaconfiguration类没有被引用。

您的KafkaConfiguration类位于某个Config包中,而User类位于某个com.xyz包中。因此,解决方案将是确保它得到拾取您的配置。最有可能的是,该包可能没有得到配置/beans定义的扫描。如果您将KafkaConfiguration移动到应用程序的根包中,那么您的原始代码应该可以正常工作。

 类似资料:
  • 因此,我使用Quarkus和Microprofile Reactive Messaging framework(带有SmallRye Kafka连接器)以及RxJava2 Flowable streams对象来进行Reactive消息接收/发送。我有一个微服务,它使用@incoming和@outgoing注释来正确地使用通道来拉回主题并将消息推送到主题。 但是,现在我想修改它,这样我仍然可以从Ka

  • 我正在尝试通过AWS SNS向GCM发送移动推送通知。根据最新的GCM 3.0留档,其中可能包括“通知”有效负载或“数据”有效负载(或两者兼而有之)。如果您发送通知有效负载,那么GCM将负责为您在最终用户设备上显示通知。 使用Amazon SNS控制台,我尝试发送仅通知有效负载,但遇到以下错误: 无效参数:消息原因:协议GCM的通知无效:json消息中需要数据密钥(服务:AmazonSNS状态码:

  • 我正在工作的谷歌Chrome推送通知,我正在尝试发送有效负载到谷歌Chrome工作者,但我不知道我如何接收这个有效负载。 我有一个API可以在数据库中创建和保存通知,我需要通过发送值,并在worker.js上接收 这是我的工作人员。JS 我就是这么叫GCM的 我试图获取,但这是未定义的。 有人有什么想法或建议吗?

  • 下面是我的代码: 当我使用上面写的JSON消息通过SNS发送自定义通知时,消息被成功接收。但是,当我试图处理JSON时,一旦它到达就会失败。我收到的错误告诉我JSON消息中没有包含正文。 我的问题是,当将有效负载从AWS SNS发送到FCM时,正确的JSON消息是什么。

  • 我有这个邮戳方法 我使用下面的JSON负载来提出我的帖子请求: 这将返回以下内容: “消息”:“JSON解析错误:无法反序列化超出起始\u数组标记的实例;嵌套异常为com.fasterxml.jackson.databind.exc.MismatchedInputException:无法反序列化超出起始\u数组标记的实例\n位于[源:(PushbackInputStream);行:1,列:1]“,