当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者无法为JsonDeserializer使用905字节的消息

窦哲彦
2023-03-14

我们正在使用与Kafka消费者和生产者Spring。我们正在生成大小为905字节的消息。我们正在序列化消息,并试图为下一个使用者反序列化它。

消息有效负载类示例:

{  
        "FILE_LIST":[  
            {
                "KEY1": "Large String Value",
                "KEY2": "Large String Value",
                "Key3": "Large String Value",
                "Key4": "Large String Value"
            }
        ]
}
import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
@AllArgsConstructor
public class InnerModel {

    @JsonProperty("KEY1")
    @Getter
    @Setter
    private String key1;

    @JsonProperty("KEY2")
    @Getter
    @Setter
    private String key2;

    @JsonProperty("KEY3")
    @Getter
    @Setter
    private String key3;

    @JsonProperty("KEY4")
    @Getter
    @Setter
    private String key4;
}
 package com.consumer.model; 


public class CustomModel {
    public CustomModel(List<InnerModel> filesList) {
        super();
        this.filesList = filesList;
    }

    @JsonProperty("FILE_LIST")
    @NonNull
    @Getter
    @Setter
    List<InnerModel> filesList;
}
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@KafkaListener(topics = "customtopic", groupId = "customgroup")
public class CustomModelConsumer {
    @KafkaHandler(isDefault = true)
    private void consumeCustomModel(CustomModel model) {
        System.out.println("Model Consumer");
        System.out.println(model);
    }
}

Application.Properties

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.compression-type=gzip 
spring.kafka.consumer.properties.spring.json.trusted.packages=com.consumer.model

当我们接受字符串格式的消息负载时,Consumer工作得很好,但当我们将Consumer中的负载反序列化为对象时,我们面临着问题。下面的错误被抛出相同

共有1个答案

司徒鸿文
2023-03-14

正如@Naveen Kumar提到的,我缺少默认或all-args构造函数。更改了如下所示的代码,以使其工作

package com.consumer.model; 

@NoArgsConstructor
@AllArgsConstructor
public class CustomModel {   
  @JsonProperty("FILE_LIST")
  @NonNull
  @Getter
  @Setter
  List<InnerModel> filesList;
}
 类似资料:
  • 我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。 这是我的Kafka制作人配置: 这就是REST控制器如何将传入的JSON转换为Avro 这是toAvro方法的实现: 然后将此对象传递给我使用上面给出的属性配置的SchemaVali

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • null 产生一些消息: 使用者无法使用消息,如果我给: 当给出服务器而不是时,能够使用消息的使用者:

  • 我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf

  • 我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co