当前位置: 首页 > 知识库问答 >
问题:

spark StructType的Avro架构

戎桐
2023-03-14

这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式

我正在使用一个Spark数据框架,它可以从几个不同的模式版本之一加载数据:

// Version One
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null}
 ]
}

// Version Two
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null},
     {"name": "B", "type": ["null", "int"], "default": null}
 ]
}

我正在使用Spark Avro加载数据。

DataFrame df = context.read()
  .format("com.databricks.spark.avro")
  .load("path/to/avro/file");

它可能是版本一文件或版本二文件。但是我希望能够以相同的方式处理它,将未知值设置为“null”。我之前的问题中的建议是设置模式,但是我不想重复自己在. avro文件中编写模式,也不想重复自己在结构类型和朋友中编写模式。如何将avro模式(文本文件或生成的MeObject.getClassSchema())转换为火花结构类型

Spark Avro有一个SchemaConverters,但它都是私有的,并返回一些奇怪的内部对象。

共有2个答案

滕无尘
2023-03-14

请看看这是否有帮助,尽管为时已晚。我一直在为我目前的工作努力。我使用了Databricks中的schemaconverter。我想,您正在尝试使用给定的模式读取avro文件。

 val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
 var sparkSchema : StructType = new StructType
 import scala.collection.JavaConversions._     
 for(field <- schemaObj.getFields()){
  sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
 }
 sparkSchema
慕意致
2023-03-14

免责声明:这是一种肮脏的黑客行为。这取决于以下几点:

  • Python提供了一个轻量级的Avro处理库,由于其动态性,它不需要类型化编写器
  • 一个空的Avro文件仍然是一个有效的文档
  • Spark模式可以与JSON转换

以下代码读取Avro模式文件,使用给定模式创建一个空的Avro文件,使用spack-csv读取它并将Spark模式输出为JSON文件。

import argparse
import tempfile

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

from pyspark import SparkContext
from pyspark.sql import SQLContext

def parse_schema(schema):
    with open(schema) as fr:
        return avro.schema.parse(open(schema).read())

def write_dummy(schema):
    tmp = tempfile.mktemp(suffix='.avro')
    with open(tmp, "w") as fw:
        writer = DataFileWriter(fw, DatumWriter(), schema)
        writer.close()
    return tmp

def write_spark_schema(path, schema):
    with open(path, 'w') as fw:
        fw.write(schema.json())


def main():
    parser = argparse.ArgumentParser(description='Avro schema converter')
    parser.add_argument('--schema')
    parser.add_argument('--output')
    args = parser.parse_args()

    sc = SparkContext('local[1]', 'Avro schema converter')
    sqlContext = SQLContext(sc)

    df = (sqlContext.read.format('com.databricks.spark.avro')
            .load(write_dummy(parse_schema(args.schema))))

    write_spark_schema(args.output, df.schema)
    sc.stop()


if __name__ == '__main__':
    main()

用法:

bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ 
   avro_to_spark_schema.py \
   --schema path_to_avro_schema.avsc \
   --output path_to_spark_schema.json

读取架构:

import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}

val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
 类似资料:
  • 我有以下对象: Bu 有时我只是我正在尝试为此提出一个架构。但它似乎:(不起作用。 我尝试了以下两种: 但它失败了,线程“main”org.apache.avro中出现<code>异常。SchemaParseException:无类型: 我也尝试了同样的错误: 我真的不明白问题出在哪里,两者有什么区别。

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • 我得到了一个定义字段的警告: 我试图将其更改为: 现在它根本不起作用,因为模式无效。 错误: 知道怎么解决吗?

  • 我使用Avro模式动态地从C#应用程序生成消息到Kafka集群,使用汇合的Kafka客户机。在编译时不知道数据类型,因此我使用命名空间中的GenericRecord类,如下所述:https://www.confluent.io/blog/decoupling-systems-with-apache-kafka-schema-registry-and-avro/。 但是,我有一个问题--如果模式有一

  • 我有一个问题,我的记录json可以为null。如何处理avro模式中的空记录?给出的文档是针对我想要为null记录获取的null属性的。

  • 在此avro模式中 当我试着给名字分配多个值,比如多个记录... 它总是为模式中的NAME字段指定Null...除了我放入的第一条记录...有没有方法可以将default用作字符串...如果没有字符串,那么它只使用模式中提到的Null 如上所述,联合是使用JSON数组表示的。例如,[“string”,“null”]声明一个模式,该模式既可以是字符串,也可以是null。 联合不能包含多个具有相同类型