当前位置: 首页 > 知识库问答 >
问题:

KSQL中的JSON到AVRO反序列化错误:由于反序列化错误而跳过记录

欧阳智志
2023-03-14

我在AWS上建立了一个汇流平台。我的源是MySql,我已经使用debezium连接器将它连接到Kafka connect。源的数据格式是JSON。现在在KSQL中,我创建了一个派生主题,并将JSON主题转换为AVRO以使数据能够使用JDBC连接器下沉到MYSQL。我使用了以下查询:

CREATE STREAM json_stream (userId int, auth_id varchar, email varchar) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');

派生主题:

create TABLE avro_stream WITH (VALUE_FORMAT='AVRO') AS select * from json_stream;

我曾尝试使用JSON消息直接下沉到mysql,但失败了,因为连接器需要模式,所以带模式的JSON或Avro消息都可以帮助我下沉数据。

 [2019-07-09 13:27:30,239] WARN task [0_3] Skipping record due to
 deserialization error. topic=[avro_stream] partition=[3] offset=[144]
 (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
 org.apache.kafka.connect.errors.DataException: avro_stream     at
 io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at
 org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at
 org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at
 org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at
 org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at
 org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
 Caused by: org.apache.kafka.common.errors.SerializationException:
 Error deserializing Avro message for id -1 Caused by:
 org.apache.kafka.common.errors.SerializationException: Unknown magic
 byte!
{
"name": "debezium-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "XXXXX",
    "auto.create.topics.enable": "true",
    "database.server.id": "1",
    "tasks.max": "1",
    "database.history.kafka.bootstrap.servers": "X.X.X.X:9092",,
    "database.history.kafka.topic": "XXXXXXX",
    "transforms": "unwrap",
    "database.server.name": "XX-server",
    "database.port": "3306",
    "include.schema.changes": "true",
    "table.whitelist": "XXXX.XXXX",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "database.hostname": "X.X.X.X",
    "database.password": "xxxxxxx",
    "value.converter.schemas.enable": "false",
    "name": "debezium-connector",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.whitelist": "XXXXX",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
    {
        "connector": "debezium-connector",
        "task": 0
    }
],
"type": "source"

共有1个答案

景哲
2023-03-14

KSQL以string的形式编写键,因此当您使用Avro进行值序列化时,键不是。因此,您的接收器工作器需要这样配置:

"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<url to schema registry>",

如果您已经将worker本身配置为使用Avro,那么您可以只覆盖连接器配置的key.converter

 类似资料:
  • 这是包含列表的POJO JSON具有以下结构: 运行时,web服务的结果运行良好,但反序列化会打印此错误:

  • 问题内容: 我试图反序列化以DateTime作为修饰符的类: 但是,当我尝试tro反序列化时,却遇到以下异常: 我用它来反序列化: 还有我的jsonData的示例: 问题答案: 期望使用无参数构造函数。的最新版本没有这样的构造函数。 如果您已固定格式,即。应该只是一个时间戳,那么你可以简单地注册与。它将在内部用于字段。您可以摆脱注释。 您需要添加库。

  • 我正在尝试使用网络库kryonet创建一个多人游戏,我得到了连接和发送字符串的工作,但现在我正在尝试发送对象。我想做的是发送一个数组列表,但它给了我这个错误。我也尝试只发送一个对象,它给了我同样的错误。 我尝试在Metor类中创建一个没有参数的构造函数,但这也不起作用 编辑:所以我得出的结论是,Kryonet在序列化slick2d图像类时有问题,或者与Kryonet和slick2d有不同的冲突。

  • 我有以下课 以及以下测试: 我收到以下错误: com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.store.domain.model.Cart实例(尽管至少存在一个Creator):无法从[Source:(String)"{"id":"56c7b5f7-115b-4cb9-9658-acb7b849d5d5"}"

  • 我尝试调用一个url谁接受列表。 发送的数据为 “{”时间戳“:1445958336633,”状态“:400,”错误“:”错误请求“,”异常“:”org.springframework.http.converter.httpmessagenotreadableException“,”消息“:”无法读取文档:无法反序列化START_OBJECT令牌之外的java.util.arraylist实例\n

  • 我有2个模式: 模式1(旧模式): 我用一个布尔字段更新了模式: 方案2(新方案): kafka主题包含属于旧模式(schema1)的消息。更新使用者模式后,即使更新字段中存在默认值,使用者也无法反序列化旧模式消息。 根据Avro文档: 阿夫罗博士 我得到以下错误,而反序列化: 当记录缺少字段时,为什么默认值未应用于使用者?任何帮助都非常感谢。提前致谢!