当前位置: 首页 > 知识库问答 >
问题:

json 文件演变为三角洲湖的动态模式

卫琛
2023-03-14

我正在建立一个Azure Databricks delta-lake,并且正在努力将我的json数据加载到delta-lake中。json中有100多种不同的文件格式。全部储存在数据湖中。

现在,我试图避免编写100个不同的python笔记本,而是构建一个元数据驱动的笔记本,它应该能够处理所有不同的json格式。

我能够得到进入三角洲湖的第一批数据,到目前为止一切顺利。问题是当我加载到特定delta-lake表中的第一个json文件在一列中包含NULL时。然后,对delta-lake的写入会自动将该列创建为string。下一个文件在同一列中包含一个嵌套的json数组,结果是我得到了这个错误消息:

html" target="_blank">分析异常:无法合并字段“payment_info”和“payment_info”。无法合并不兼容的数据类型StringType和structType(structField(@type, StringType, true), structField(bank_name, StringType, true), structField(bic, StringType, true), structField(iban, StringType, true), structField(所有者, StringType, true))

这是我的数据。首先json看起来像:

{
"payment_info": null,
"shop_id": 1,
"shop_name": "Testshop",
"shop_state": "OPEN"
}

然后第二个json文件包含以下信息:

{
"payment_info": {
    "@type": "IBAN",
    "bank_name": "bankName",
    "bic": "ABCD12345",
    "owner": "baName"
},
"shop_id": 2,
"shop_name": "Another TestShop",
"shop_state": "OPEN"

}

我猜这是代码的相关部分:

jsondf = spark.read.option("multiline","true") \
  .format("json") \
  .option("mergeSchema", "true") \
  .load(load_path)

jsondf.write \
  .format("delta") \
  .mode("append") \
  .option("mergeSchema", "true") \
  .saveAsTable(table_name)

如果我能只创建初始增量表,而不创建所有记录的 NULL 列,我会非常高兴。这样,架构演化将在稍后填充时起作用。我没有任何情况,列首先作为int,然后作为字符串,或者首先作为字符串,然后作为复杂结构。

我还试着把它放在一个临时表中:

jsondf.createOrReplaceTempView("tempview")
sourcedf = spark.sql("select * from tempview")

并改用合并语句:

spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql("MERGE INTO " + table_name + " as t " + \
  "USING " + batch_table_name + " as s " + \
  "ON " + joinsql + " " + \
  "WHEN MATCHED THEN " + \
     "UPDATE SET * " + \
  "WHEN NOT MATCHED THEN " + \
     "INSERT *")

不幸的是,这一切都以相同的结果告终。

那么,有没有什么简单的方法可以删除所有为空的列?或者有别的方法吗?

也许我可以使用一个填充了所有字段的伪json文件来获得表的模式定义。那是可行的。而手动定义模式将非常耗时。

干杯

共有1个答案

宦飞
2023-03-14

这是两个月前的事了,但我是这样处理这个问题的。

读取目录中的所有 json 文件。将架构保存到一个文件中,以便在读取目录和写入增量的单独作业中引用。

from pyspark.sql.types import StructType    
import json    

file_schema = spark.read.format("json").load("s3a://dir/*.json")

file_schema.printSchema()

with open("/dbfs/FileStore/schemas/schema.json", "w") as f:
    json.dump(file_schema.schema.jsonValue(), f)

现在在新作业中,我加载模式文件并在读取时使用它

with open("/dbfs/FileStore/schemas/schema.json") as f:
    the_schema = StructType.fromJson(json.load(f))

然后,您可以在模式选项中引用它

file_reader = spark.readStream.format('json') \
    .schema(gds_schema) \
    .load(your_path_to_files)

这是一个经过清理的版本,但是它为您提供了一个正确的方向,并且有一个可以引用的托管模式。如果您的文件具有相同的字段名但不同的值类型,您会希望它保存为字符串。

但是,我建议只使用自动加载程序作为用例,让自动加载程序将字段存储为字符串,并在下游应用您的转换。我每天将其用于250k新的json文件,其中包含超过350个字段的架构。

 类似资料:
  • 我需要在数据库中创建一个现有的三角洲湖表上的抽象。是否可以在Spark中基于Delta Lake表创建SQL Server类型的SQL视图?

  • 我是火花三角洲湖的新手。我正在创建三角洲表顶部的配置单元表。我有必要的jars delta-core-shaded-assembly2.11-0.1.0.jar,hive-delta2.11-0.1.0.jar;在配置单元类路径中。设置以下属性。 但是在创建表时 两个表的架构匹配。堆栈详细信息:Spark:2.4.4Hive:1.2.1 任何帮助都是非常感谢的。提前谢了。

  • 我正在使用Azure数据块,并在ADLS Gen2上创建了一个delta表。 我已经创建了4个版本的三角洲湖。 我试图用下面的命令恢复到版本2。 有人能告诉我为什么我不能恢复到旧版本吗?现在发生如下错误。

  • 我正在使用开源版本将大量数据写入Databricks Delta lake,该版本在AWS EMR上运行,S3作为存储层。我正在使用EMRFS。 为了提高性能,我每隔一段时间就会压缩和清空表: 我已经阅读了这篇文章火花:作业之间的长延迟,这似乎表明它可能与镶木地板有关?但是我在增量端没有看到任何选项来调整任何参数。

  • 我正在使用java中的WireMock来存根POST请求。该请求返回一个json主体文件,该文件存储在my local中。存根如下所示: 响应主体文件的一部分,“stubThree”如下所示: 请求url有许多参数,如下所示: stubing工作得很好,但我的目标是使用响应模板提供动态响应。我只想使用请求url中的“subscription_proration_date”值更新json文件的“st

  • 我试图创建一个正弦和余弦的计算器,技术上只在0-pi/2的范围内运行。现在这可能看起来很傻,但以后它将被使用,这样我就可以使用泰勒级数。 我有一个大部分工作的实现,但是当θ是x*(PI/2)的形式时,我有一个严重的问题,其中x是一个任意的整数。看起来,在这些值上,有时它们被推入附近的象限,它们不属于。也有一些偶尔的彻头彻尾的错误,我无法解释。 下面是这样做的代码。