当前位置: 首页 > 知识库问答 >
问题:

阿夫罗。io。AvroTypeException:数据不是模式{…}的示例

翁翰
2023-03-14

我们正在努力将Apache Storm与Kafka的Confluent框架集成在一起。我们正在使用名为“Pyleus”的storm python包装器

我们设置了一个监控数据库表的ConFluent-Kafka JDBC连接器,每当DB发生变化时,新记录将作为Avro格式的Kafka消息发送。

在Pyleus bolt中,我们能够获取Kafka消息,但是,我们无法将其反序列化为JSON。

我们正在使用两个名为“Avro\u json\u serializer”和“Avro”的python Avro模块。当我试图反序列化我放在一起的简单Avro文件时,它们就起作用了。

Kafka消息中Avro数据的Avro模式是使用HTTP GET从Confluent的模式注册表中获取的。我将Kafka消息中的模式和Avro数据放入两个文件中,下面是我的测试程序:

import avro
import avro_json_serializer as ajs

import json

# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()

schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())

serializer = ajs.AvroJsonSerializer(avro_schema)

# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()

jsonData = serializer.to_json(avro) # where the code error out #

print jsonData

我解释错误消息的方式是我的avro模式不适合我的avro数据:

avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
  "namespace": "example.avro",
  "type": "record",
  "connect.name": "TABLE_NAME",
  "fields": [
    {
      "type": "int",
      "name": "Column_1"
    },
    ... (omitting the rest of the schema)

我从这里读到ConFluent框架中Avro格式的Kafka消息在消息开头有4个额外的字节,指示模式ID。我试图去掉Avro数据的前4个字节,然后将其发送到“serializer.to_json()”,但仍然没有运气。

救命!

共有1个答案

彭骏
2023-03-14

我在通过Storm Kafka spout读取kafka汇合数据时遇到了完全类似的问题。这是为我工作的等效Java代码。

    ByteBuffer input = ByteBuffer.wrap(data);
    int id = input.getInt();
    int start = input.position() + 1;
    MyAvroObject obj = null;
    try {
        obj  = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));

    } catch (IOException e) {
        e.printStackTrace();
    }
    return obj;

ByteBuffer上的getInt()和position方法将指针移动到架构Id之后。希望这有帮助。

 类似资料:
  • 正在尝试读取avro文件。 无法将运行到Avro架构的数据转换为Spark SQL StructType:[“null”,“string”] 尝试手动创建架构,但现在遇到以下情况: 通用域名格式。databricks。火花阿夫罗。SchemaConverters$CompatibleSchemaException:无法将Avro架构转换为catalyst类型,因为路径处的架构不兼容(avroTyp

  • 在Spring-Cloud-Stream中是否有支持或计划支持avro和/或来自汇流平台的模式注册表?我发现spring-integration-kafka 1.3.0版中存在对avro的依赖,而spring-cloud-stream-binder-kafka的主分支(2.0)和spring-kafka没有任何avro依赖。

  • 我有一个avro格式的数据流(json编码),需要存储为镶木地板文件。我只能这样做, 把df写成拼花地板。 这里的模式是从json中推断出来的。但是我已经有了avsc文件,我不希望spark从json中推断出模式。 以上述方式,parquet文件将模式信息存储为StructType,而不是avro.record.type。是否也有存储avro模式信息的方法。 火花 - 1.4.1

  • 使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen

  • 现在Spark 2.4已经内置了对Avro格式的支持,我正在考虑将数据湖中某些数据集的格式从Parquet更改为Avro,这些数据集通常是针对整行而不是特定列聚合进行查询/联接的。 然而,数据之上的大部分工作都是通过Spark完成的,据我所知,Spark的内存缓存和计算是在列格式的数据上完成的。在这方面,Parquet是否提供了性能提升,而Avro是否会招致某种数据“转换”损失?在这方面,我还需要

  • 在将Java对象转换为JSON字符串时,我遇到了JsonMappingException。下面是完整的异常消息。 下面是我用来将Java对象转换为JSON字符串的java代码。我尝试启用ACCEPT_SINGLE_VALUE_AS_ARRAY,ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT但仍然面临问题。不确定是什么导致了JsonMappingException:不是数组问题。