当前位置: 首页 > 知识库问答 >
问题:

使用Kafka主题消息时的反序列化问题

仰英发
2023-03-14

我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为19205124”

#POJO classes

    public class Change {
    private  Schema schema;
    private  Payload payload;
    //Getters and constructor
    }
    public class Details {
    private List<String> effectedAttributes;
    private List<PartnerHotel> cluster;
    //Getters and contructor
    }
    public class Field {
    private String type;
    private Boolean optional;
    private String field;
    //Getters and constructor
    }
    public class Fields {
    private String type;
    private List<Field> fields;
    private Boolean optional;
    private String name;
    //Getters and contructor
    }
    public class Geom{
    private int srid;
    private String wkb;
    //Getters and contructor
    }
    public class PartnerHotel{
    private int id;
    private int shopId;
    private String partnerHotelId;
    private boolean isOnline;
    private boolean isRemovedByUser;
    private int mappingPriority;
    private int hotelId;
    private String statusHotelId;
    private String name;
    private String street;
    private String zipCode;
    private String city;
    private String sourceCityId;
    private String state;
    private String stateAlpha2;
    private String country;
    private String alpha2;
    private String alpha3;
    private double latitude;
    private double longitude;
    private Geom geomPoint;
    private int countryIdShop;
    private int selectedGeoname;
    private String propertyType;
    private List<String> tags;
    private int stars;
    private String url;
    private int nrRatings;
    private double recommendation;
    private long dateHotelId;
    private long timeStamp;
    private long lastImport;
    //Getters and contructor
    }
    public class Payload {
    private PartnerHotel before;
    private PartnerHotel after;
    private Source source;
    private String op;
    private String ts_ms;
    //Getters and contructor
    }
    public class Schema {
    private String type;
    private Boolean optional;
    private String name;
    private List<Fields> fields;
    //Getters and contructor
    }
    public class Source {
    private String version;
    private String name;
    private String ts_usec;
    private String txId;
    private String lxn;
    private Boolean snapshot;
    private Object lastSnapshotRecord;
    //Getters and contructor
    }

#Deserializer

    public class ChangeDeserializer implements Deserializer<Change> {

    public ChangeDeserializer(){ }

    public void configure(Map<String, ?> map, boolean b) {}

    public Change deserialize(String topic, byte[] data) {
        if(data == null){
            return null;
        }
        try{
            ObjectMapper objectMapper = new ObjectMapper();
            Change change = objectMapper.readValue(data,Change.class);
            return change;
        }
        catch(IOException exception){
            throw new DeserializationException("Unable to deserialize               Change", exception);
        }}

    public void close() {}
    }

#Consumer
    public class KafkaAcnowledger {
        public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "someUrl");
        props.put("group.id", "test131");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("max.poll.records",1);
        props.put("auto.offset.reset","earliest");
        props.put("key.deserializer",    "org.apache.kafka.common.serialization.LongDeserializer");
        props.put("value.deserializer",    "deserializer.ChangeDeserializer");
        KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel"));
        while (true) {
            try{
            ConsumerRecords<Long, Change> records = consumer.poll(100);
            for (ConsumerRecord<Long, Change> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        catch(Exception exception){
                System.out.println("Exception raised" + exception);
        }
        }


    }
    }

使用者中的poll()看起来很好,Enter code here异常我得到了一个序列化异常。我通过kafka-consumer-groups.sh检查了消费者组,这个消费者组就在列表中。

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}}

共有1个答案

丌官凯康
2023-03-14

您的POJO与您的消息不兼容,jackson无法解析它。至少缺少几个字段,可以发现跟随错误。

Unrecognized field "timestamp" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "zipcode" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedGeonames" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "chains" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "creditCards" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedPartnerHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "removedFromHotelIds" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "db" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "lsn" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "schema" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "table" (class  com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "last_snapshot_record" (class  com.example.kafka.Change$Source), not marked as ignorable

要修复它,您必须将这些字段添加到POJO中,或者禁用fail on Unknown:objectmapper.disable(deserializationfeature.fail_on_unknown_properties);。更多关于jackson反序列化错误的信息可以在这里找到:jackson Unrecognized field

 类似资料:
  • 我试图通过Spring KafkaListener在单独的消费者应用程序中使用这些消息 集装箱工厂配置 在这种配置下,使用者不接收消息(字节)。如果我将Kafka侦听器更改为接受字符串,则会出现以下异常: 集装箱工厂配置 制片人-

  • JSON消息正在被字符串使用者消费。My Products发送两种类型的消息字符串和序列化JSON 消费者方面我有两个消费者1。正在侦听字符串消息%2。监听json并反序列化为对象 即使是json消息也由String Listener使用。

  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我计划使用kafka作为事件来源的持久日志,目前正在研究不同的序列化选项。我目前的重点是使用thrift对我将存储在Kafka中的消息进行序列化和反序列化。