现在组里接的项目主要是在Azure Databricks里面用PySpark写脚本处理数据。而它目前的代码单元测试这一块是完全崩了,其中有几个已经写了的测试例子也无法运行,原因是它会加载avro的schema文件作为目标schema的转化和验证。但是因为处理逻辑的变更已经增加了一些列,可以avro文件并没有一直更新,使得测试无法成功。可是直接编辑avro文件尝试了很多软件或者在线编辑都无法成功,于是就思考替换这里的schema的加载方式。
下面根据网络查阅及整理,找到的几种加载schema的方式如下:
1. 直接声明的方式
self.src_schema = StructType([
StructField("value", StringType()),
StructField("publisher_id", IntegerType()),
StructField("event_datetime", StringType()),
StructField("process_datetime", StringType()),
StructField("dt", StringType())])
mock_module.SchemaRegistry().pull_schema_from_schema_registry.return_value = self.src_schema
from pyspark.sql.functions import *
from pyspark.sql.types import *
aug_schema = StructType([
StructField("country", ArrayType(StringType())),
StructField("connection_type", StringType()),
StructField("city", StringType()),
StructField("latitude", StringType()),
StructField("longitude", StringType()),
StructField("domain", StringType()),
StructField("postal", StringType()),
StructField("device", StringType()),
StructField("day_of_week", StringType()),
StructField("time_of_day", StringType()),
StructField("os_name", StringType()),
StructField("os_version", StringType())
])
df = spark.sql("SELECT * FROM impression WHERE process_time == '2022-03-18-00' limit 100")
df.withColumn("aug_targets", from_json("aug_targets", aug_schema)).select(["process_time", "aug_targets.*"]).show()
上面的代码实现了从impreesion表中取出aug_targets列然后用aug_schema去解析它的json格式的value。
2.从json schema文件中加载
with open(THIS_DIR + "/data/conversion_schema.json") as f:
conv_schema = StructType.fromJson(json.load(f))
json中schema的定义格式:
{ "fields": [ { "metadata": {}, "name": "advertiser_info", "nullable": true, "type": { "fields": [ { "metadata": {}, "name": "advertiser_id", "nullable": true, "type": "integer" }, { "metadata": {}, "name": "bid", "nullable": true, "type": "double" } ], "type": "struct" } }, { "metadata": {}, "name": "interaction", "nullable": true, "type": { "fields": [ { "metadata": {}, "name": "flights", "nullable": true, "type": { "containsNull": true, "elementType": "string", "type": "array" } }, { "metadata": {}, "name": "house", "nullable": true, "type": "boolean" } ], "type": "struct" } } ], "type": "struct" }
3.从avro schema文件中加载
df = self.spark.read.format("avro").load("/data/conversion_schema.avro")
print(df.schema)
其实只要任何能够作为spark支持的format的数据都可以加载进来,只要能够加载进来就可以拿到对应的schema。但是当我们只想保存比较简单的schema的文件时候可以用数据输出的方式或者直接打印schema的方式获取。
# If we already have df, and want to save the schema
# 1. json mode to print the schema and then save to a json file
print(df.schema.json())
# 2. output as avro format to a path and go to the path you will find a .avro file
df.write.format("avro").save("/tmp/output")
Schema更新
如果我们在线上运行时候需要对schema进行更新,比如加列,改列名,拆分列等,可以怎么做呢?
# aug_targets 列解析json值,然后选出这列下面的子列
# select 相当于删除不需要的列
df.withColumn("aug_targets", from_json("aug_targets", aug_schema)).select(["aug_targets.*"])
# withColumn 增加列
df.withColumn("process_datetime", col("process_datetime").cast(TimestampType()))
# withColumnRenamed 修改列名,如果列名不存在则忽略
df.withColumnRenamed("augment_targets","aug_targets"))
官方文档里有介绍的Spark支持的数据格式如下:
https://spark.apache.org/docs/latest/sql-data-sources.html