当前位置: 首页 > 工具软件 > Schema > 使用案例 >

PySpark中加载schema的几种方式

堵泽宇
2023-12-01

现在组里接的项目主要是在Azure Databricks里面用PySpark写脚本处理数据。而它目前的代码单元测试这一块是完全崩了,其中有几个已经写了的测试例子也无法运行,原因是它会加载avro的schema文件作为目标schema的转化和验证。但是因为处理逻辑的变更已经增加了一些列,可以avro文件并没有一直更新,使得测试无法成功。可是直接编辑avro文件尝试了很多软件或者在线编辑都无法成功,于是就思考替换这里的schema的加载方式。

下面根据网络查阅及整理,找到的几种加载schema的方式如下:

1. 直接声明的方式

        self.src_schema = StructType([
            StructField("value", StringType()),
            StructField("publisher_id", IntegerType()),
            StructField("event_datetime", StringType()),
            StructField("process_datetime", StringType()),
            StructField("dt", StringType())])
        mock_module.SchemaRegistry().pull_schema_from_schema_registry.return_value = self.src_schema
from pyspark.sql.functions import *
from pyspark.sql.types import *
aug_schema = StructType([
  StructField("country", ArrayType(StringType())),
  StructField("connection_type", StringType()),
  StructField("city", StringType()),
  StructField("latitude", StringType()),
  StructField("longitude", StringType()),
  StructField("domain", StringType()),
  StructField("postal", StringType()),
  StructField("device", StringType()),
  StructField("day_of_week", StringType()),
  StructField("time_of_day", StringType()),
  StructField("os_name", StringType()),
  StructField("os_version", StringType())
])
df = spark.sql("SELECT * FROM impression WHERE process_time == '2022-03-18-00' limit 100")
df.withColumn("aug_targets", from_json("aug_targets", aug_schema)).select(["process_time", "aug_targets.*"]).show()

上面的代码实现了从impreesion表中取出aug_targets列然后用aug_schema去解析它的json格式的value。

2.从json schema文件中加载

        with open(THIS_DIR + "/data/conversion_schema.json") as f:
            conv_schema = StructType.fromJson(json.load(f))

json中schema的定义格式:

{
  "fields": [
    {
      "metadata": {},
      "name": "advertiser_info",
      "nullable": true,
      "type": {
        "fields": [
          {
            "metadata": {},
            "name": "advertiser_id",
            "nullable": true,
            "type": "integer"
          },
          {
            "metadata": {},
            "name": "bid",
            "nullable": true,
            "type": "double"
          }
        ],
        "type": "struct"
      }
    },
    {
      "metadata": {},
      "name": "interaction",
      "nullable": true,
      "type": {
        "fields": [
          {
            "metadata": {},
            "name": "flights",
            "nullable": true,
            "type": {
              "containsNull": true,
              "elementType": "string",
              "type": "array"
            }
          },
          {
            "metadata": {},
            "name": "house",
            "nullable": true,
            "type": "boolean"
          }
        ],
        "type": "struct"
      }
    }
  ],
  "type": "struct"
}

3.从avro schema文件中加载

df = self.spark.read.format("avro").load("/data/conversion_schema.avro")
print(df.schema)

其实只要任何能够作为spark支持的format的数据都可以加载进来,只要能够加载进来就可以拿到对应的schema。但是当我们只想保存比较简单的schema的文件时候可以用数据输出的方式或者直接打印schema的方式获取。

# If we already have df, and want to save the schema

# 1. json mode to print the schema and then save to a json file
print(df.schema.json())

# 2. output as avro format to a path and go to the path you will find a .avro file
df.write.format("avro").save("/tmp/output")

Schema更新

如果我们在线上运行时候需要对schema进行更新,比如加列,改列名,拆分列等,可以怎么做呢?

# aug_targets 列解析json值,然后选出这列下面的子列
# select 相当于删除不需要的列
df.withColumn("aug_targets", from_json("aug_targets", aug_schema)).select(["aug_targets.*"])

# withColumn 增加列
df.withColumn("process_datetime", col("process_datetime").cast(TimestampType()))

# withColumnRenamed 修改列名,如果列名不存在则忽略
df.withColumnRenamed("augment_targets","aug_targets"))

官方文档里有介绍的Spark支持的数据格式如下:

https://spark.apache.org/docs/latest/sql-data-sources.html

  • Parquet
  • ORC
  • JSON
  • CSV
  • Text
  • Hive Tables
  • JDBC
  • Avro
  • Whole Binary Files
 类似资料: