当前位置: 首页 > 知识库问答 >
问题:

Spring kafka消息反序列化失败,因为content-type设置为application/json

谢叶五
2023-03-14

我向Kafka传达的信息如下:

private KafkaTemplate<String, MyMessage > kafkaTemplate;

public void sendMessage(MyMessage data) {

    Message<MyMessage > message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC,topic)
            .setHeader(KafkaHeaders.MESSAGE_KEY,data.getKey())
            .build();
    kafkaTemplate.send(message);

我有自己的MyMessage自定义解/序列化程序,关键字是一个简单的字符串

@StreamListener
    public void process(@Input("input") KStream<String,MyMessage> myStream){
            final Serde<String> stringSerde = Serdes.String();
            final MyMessageSerde myMessageSerde = new MyMessageSerde();
            myStream
                    .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                    .aggregate(ArrayList::new,
                            (newKey,val,agg) -> {
                                agg.add(val);
                                return agg;
                            },
                            Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                    .withKeySerde(stringSerde)
                                    .withValueSerde(new ArrayListSerde(myMessageSerde)));

    ...
    interface MyStreamProcessor {
        @Input("input")
        KStream<?, ?> input();
    }
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myTable){
        final Serde<String> stringSerde = Serdes.String();
        final MyMessageSerde myMessageSerde = new MyMessageSerde();
        KStream<String,MyMessage> myStream = myTable.toStream();
        myStream
                .groupBy((key,value)-> value.getObjectKey(), Serialized.with(stringSerde,myMessageSerde))
                .aggregate(ArrayList::new,
                        (newKey,val,agg) -> {
                            agg.add(val);
                            return agg;
                        },
                        Materialized.<String, ArrayList<MyMessage>, KeyValueStore<Bytes, byte[]>>as("object-keys")
                                .withKeySerde(stringSerde)
                                .withValueSerde(new ArrayListSerde(myMessageSerde)));

...
interface MyStreamProcessor {
    @Input("input")
    KTable<?, ?> input();
}

共有1个答案

邹高懿
2023-03-14

这是因为您在.group方法中指定了自定义(de)序列化。看看文档的这部分和这部分,看看如何指定自定义值SERDE。

 类似资料:
  • 我正在尝试让Tomcat在关机时坚持我的会话。我已经解决了这样一个要求,即我在会话中存储的所有内容都是可序列化的,当我在Eclipse下运行Tomcat时,这很好:当代码更改迫使Eclipse重新加载webapp时,会话现在仍然存在。 然而,当我试图在一个独立的Tomcat中做同样的事情时,我遇到了一个问题。 我取消了${catalina.base}/conf/context.xml中的Manag

  • 我试图使用javascript中的Fetch API向spring应用程序发送一些表单数据。我有以下代码来发送表单数据: 但是我得到一个415状态错误“不支持的媒体类型”。即使我将标头“Content-Type”专门设置为“Application/json”,它也会像“text/平原”一样发送 这是我从服务器得到的响应: 以下是在Spring中接受请求的方法: 我不知道为什么请求是以“text/p

  • Jackson可以在2.6.5中为以下类反序列化json,但在2.8.8中失败。 型号: JSON: 例外情况是: 我发现lombok创建的子构造函数导致了这个错误。当我去掉lombok注释或手动创建构造函数时,这种情况就会停止。无论如何,它都应该使用no args Child()构造函数。是什么导致了这个问题?

  • 问题内容: 嗨,大家好,我是AngularJS的新手,我需要您的帮助。 我需要做的就是将json发布到API并获得正确的响应。 这是我不知道在哪里编写此代码的JSON。 JSON格式 如果我做的正确,则不确定。 控制器 服务.JS INDEX.HTML 希望你能帮助我。 我不断收到诸如“不受支持的媒体类型”之类的回复…我不知道该怎么办… 任何建议或评论都将很棒。。谢谢! 问题答案: 在Angula

  • 我正在尝试使用apache camel jackson xml对xml进行反序列化,然后发生了一些奇怪的事情,我无法解释。这是我正在尝试反序列化的xml(它是简单的xmltd v xml文件): 这是我的POJO: 电视。Java语言 程序Java语言 Credits.java 其他POJO并不重要。下面是路由配置: 使用此设置和我在开始时提供的xml,会发生以下异常: 通用域名格式。faster

  • 当我尝试将content-type设置为时,我会丢失边界,并在响应中得到一个错误: 添加UTF-8字符集的正确方法是什么?