当前位置: 首页 > 知识库问答 >
问题:

使用Spark和Scala扁平化json文件

司马德水
2023-03-14

我有一个这样的json文件:

{    
  "Item Version" : 1.0,    
  "Item Creation Time" : "2019-04-14 14:15:09",        
  "Trade Dictionary" : {    
    "Country" : "India",    
    "TradeNumber" : "1",    
    "action" : {    
      "Action1" : false,    
      "Action2" : true,    
      "Action3" : false    
    },    
    "Value" : "XXXXXXXXXXXXXXX",    
    "TradeRegion" : "Global"    
  },    
  "Prod" : {    
    "Type" : "Driver",    
    "Product Dic" : { },    
    "FX Legs" : [ {    
      "Spot Date" : "2019-04-16",        
      "Value" : true    
    } ]    
  },    
  "Payments" : {    
    "Payment Details" : [ {    
      "Payment Date" : "2019-04-11",    
      "Payment Type" : "Rej"
    } ]
  }
}

我需要以下格式的表格:

Version|Item Creation Time|Country|TradeNumber|Action1|Action2|Action3|Value |TradeRegion|Type|Product Dic|Spot Date |Value|Payment Date|Payment Type |
1 |2019-04-14 14:15 | India| 1 | false| true | false |xxxxxx|Global |Driver|{} |2019-04-16 |True |2019-11-14 |Rej

所以它只需迭代每个键值对,将键作为列名,并将其值放入表值。

我当前的代码:

val data2 = data.withColumn("vars",explode(array($"Product")))
  .withColumn("subs", explode($"vars.FX Legs"))
  .select($"vars.*",$"subs.*")

这里的问题是我必须自己提供列名。有什么方法可以让这个更通用吗?

共有3个答案

刘凡
2023-03-14

这个解决方案可以很容易地使用一个名为JFlot-https://github.com/opendevl/Json2Flat.的库来实现

String str = new String(Files.readAllBytes(Paths.get("/path/to/source/file.json")));

JFlat flatMe = new JFlat(str);

//get the 2D representation of JSON document
List<Object[]> json2csv = flatMe.json2Sheet().getJsonAsSheet();

//write the 2D representation in csv format
flatMe.write2csv("/path/to/destination/file.json");
魏君博
2023-03-14

使用分解函数用数组展平数据帧。下面是一个示例:

val df = spark.read.json(Seq(json).toDS.rdd)
df.show(10, false)
df.printSchema

df: org.apache.spark.sql.DataFrame = [Item Creation Time: string, Item Version: double ... 3 more fields]
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
|Item Creation Time |Item Version|Payments                        |Prod                                    |Trade Dictionary                                   |
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
|2019-04-14 14:15:09|1.0         |[WrappedArray([2019-04-11,Rej])]|[WrappedArray([2019-04-16,true]),Driver]|[India,1,Global,XXXXXXXXXXXXXXX,[false,true,false]]|
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payments: struct (nullable = true)
 |    |-- Payment Details: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Payment Date: string (nullable = true)
 |    |    |    |-- Payment Type: string (nullable = true)
 |-- Prod: struct (nullable = true)
 |    |-- FX Legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Spot Date: string (nullable = true)
 |    |    |    |-- Value: boolean (nullable = true)
 |    |-- Type: string (nullable = true)
 |-- Trade Dictionary: struct (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- TradeNumber: string (nullable = true)
 |    |-- TradeRegion: string (nullable = true)
 |    |-- Value: string (nullable = true)
 |    |-- action: struct (nullable = true)
 |    |    |-- Action1: boolean (nullable = true)
 |    |    |-- Action2: boolean (nullable = true)
 |    |    |-- Action3: boolean (nullable = true)


val flat = df
    .select($"Item Creation Time", $"Item Version", explode($"Payments.Payment Details") as "row")
    .select($"Item Creation Time", $"Item Version", $"row.*")
flat.show

flat: org.apache.spark.sql.DataFrame = [Item Creation Time: string, Item Version: double ... 2 more fields]
+-------------------+------------+------------+------------+
| Item Creation Time|Item Version|Payment Date|Payment Type|
+-------------------+------------+------------+------------+
|2019-04-14 14:15:09|         1.0|  2019-04-11|         Rej|
+-------------------+------------+------------+------------+
路伟
2023-03-14

由于数组列和结构列在多个级别混合在一起,因此创建通用解决方案并不是那么简单。主要问题是,必须在作为动作的所有数组列上执行分解函数。

我能想到的最简单的解决方案使用递归来检查任何结构或数组列。如果有,那么这些将被展平,然后我们再次检查(展平后会有额外的列,可以是数组或结构,因此很复杂)。flattenstruct部分来自这里。

代码:

def flattenStruct(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)   
    f.dataType match {
      case st: StructType => flattenStruct(st, colName)
      case _ => Array(col(colName))
    }
  })
}

def flattenSchema(df: DataFrame): DataFrame = {
    val structExists = df.schema.fields.filter(_.dataType.typeName == "struct").size > 0
    val arrayCols = df.schema.fields.filter(_.dataType.typeName == "array").map(_.name)

    if(structExists){
        flattenSchema(df.select(flattenStruct(df.schema):_*))
    } else if(arrayCols.size > 0) {
        val newDF = arrayCols.foldLeft(df){
          (tempDf, colName) => tempDf.withColumn(colName, explode(col(colName)))
        }
        flattenSchema(newDF)
    } else {
        df
    }
}

在输入数据帧上运行上述方法:

flattenSchema(data)

将给出一个具有以下模式的数据框:

root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payment Date: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Spot Date: string (nullable = true)
 |-- Value: boolean (nullable = true)
 |-- Product Dic: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- TradeNumber: string (nullable = true)
 |-- TradeRegion: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Action1: boolean (nullable = true)
 |-- Action2: boolean (nullable = true)
 |-- Action3: boolean (nullable = true)

要将结构列的前缀保留在新列的名称中,只需在flattstruct函数中调整最后一个大小写:

case _ => Array(col(colName).as(colName.replace(".", "_")))
 类似资料:
  • 如何将一个简单的(即没有嵌套结构的)数据表扁平化为列表?我的习题集是检测从节点对表中更改/添加/删除的所有节点对。 这意味着我有一个“before”和“after”表要比较。将before和after dataframe组合在一起生成的行描述了一对数据在一个dataframe中出现而在另一个dataframe中不出现的位置。 单独且不同地合并所有列 平面地图和不同的 映射和展平 由于结构是众所周知

  • 问题内容: 我在这里发现了同样的问题… …但是没有正确的答案。 最好的建议之一是将嵌套对象包装到新类中,但是这种方法引入了另一个问题:乐高名称。 在我的示例中,此类的最逻辑名称是与父类相同的名称,当然这是不可能的。我的示例很简单,我只想消除父类中的“语言”属性。有人可以帮我做吗? json的示例: 问题答案: 如果JSON属性名称与c#命名约定冲突,则可以在序列化期间使用或批注替换其他名称。 例如

  • 问题内容: 此问题特定于从[GitHub Repo使用:flatten](https://github.com/amirziai/flatten) 该软件包位于pypi flatten-json 0.1.7上,可以与 此问题特定于软件包的以下组件: 使用递归展平嵌套 用Python递归思考 在Python中展平JSON对象 嵌套如何?: 已用于解压缩最终超过100000列的文件 展平的JSON是否

  • {“IFAM”:“EQR”,“KTM”:1430006400000,“COL”:21,“Data”:[{“MLRATE”:“30”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“31”,“NROUT”:“0”,“UP”:NULL,“板条箱”:“2”},{“MLRATE”:“30”,“NROUT”:“5”,“UP”:“NULL”:“2”},{“MLRATE”

  • 我想在指定年份之间的一年做一个季度的笛卡尔积 年度(2105、2016)应返回季度(2015、Q1)、季度(2015、Q2)...季度(2016年第四季度) 我想出的代码如下所示 上面的代码返回,我需要将其扁平化为。

  • 问题内容: 我有一个这样的清单: 此列表中的每个项目可能包含一个数据对或一个元组,我想将此列表更改为 然后这样做: 我不知道如何更改列表结构,或者如何基于原始列表进行相同的计算? 问题答案: 如果您只想整理列表,请使用:http : //docs.python.org/library/itertools.html#itertools.chain.from_iterable