当前位置: 首页 > 知识库问答 >
问题:

将元素添加到结构数组并合并Spark 2.3中的结构数组

宣望
2023-03-14

我有下面的数据帧模式作为df.current模式,需要获得预期的模式作为df.expected模式,有没有一种方法,我可以在火花2.3实现这一点

df.current架构:

 |-- enqueuedTime: timestamp (nullable = true)
 |-- VIN: string (nullable = true)
 |-- TT: long (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

df。预期架构:

 |-- enqueuedTime: timestamp (nullable = true)
 |-- VIN: string (nullable = true)
 |-- TT: long (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- SIGNAL: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- SN: string (nullable = true)
 |    |    |-- E:  long (nullable = true)
 |    |    |-- V:  double (nullable = true)
 |    |    |-- SN: string (nullable = true) 
 |    |    |-- E:  long (nullable = true)
 |    |    |-- V:  double (nullable = true)
 |    |    |-- SN: string (nullable = true)
 |    |    |-- E: long (nullable = true)
 |    |    |-- V: double (nullable = true)

示例数据:

 +----------------+---+---------+-----------------------------------------------------+--------------------------+
|vin              |tt |msg_type |ada                                                  |adw                       |                      |
+-----------------+---+---------+-----------------------------------------------------+--------------------------+
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |[{"E":15XXXXXXXX,"V":2, {"E":15XXXXXXXX,"V":1}]      |null                      |                          
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |null                                                 |[{"E":15XXXXXXXX,"V":3}]  |                      
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |null                                                 |[{"E":15XXXXXXXX,"V":4.1}]|
+-----------------+---+---------+--------------------------+--------------------------+--------------------------+

注意:这里需要实现两件事:

  1. 为元素中的每个E、V对创建新字段SN,其值应为数组名称。例如:对于第一个数组列(ADA),SN的值=ADA
  2. 将阵列(ADA、ADW)合并为一个外部阵列(信号)

共有1个答案

姜鹏程
2023-03-14

您正在查找的模式不正确

scala> newDF.printSchema
root
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- number: long (nullable = true)
 |-- tt: long (nullable = true)
 |-- vin: string (nullable = true)
 |-- sig: struct (nullable = false)
 |    |-- SN: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- E: string (nullable = true)
 |    |    |    |-- V: long (nullable = true)
 |    |-- SN: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- E: string (nullable = true)
 |    |    |    |-- V: long (nullable = true)

如果您对这个模式很满意,请在实现它的过程中进一步阅读。

创建虚拟数据以复制您的模式(此步骤可以忽略)

scala> val vas = """{"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }"""
vas: String = {"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }

scala> val df = spark.read.json(Seq(vas).toDS).toDF.withColumn("arr", explode($"df")).select("arr.*")
df: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 4 more fields]

我希望您的数据是这样的:

scala> df.show(false)
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|ADA                             |ADW                             |MSG_TYPE|number|tt |vin              |
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
+--------------------------------+--------------------------------+--------+------+---+-----------------+

实现所需输出的步骤

scala> val newDF = df.withColumn("sig", struct($"ADA".as("SN"), $"ADW".as("SN")))
newDF: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 5 more fields]

scala> newDF.printSchema
root
 |-- ADA: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- ADW: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: long (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- number: long (nullable = true)
 |-- tt: long (nullable = true)
 |-- vin: string (nullable = true)
 |-- sig: struct (nullable = false)
 |    |-- SN: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- E: string (nullable = true)
 |    |    |    |-- V: long (nullable = true)
 |    |-- SN: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- E: string (nullable = true)
 |    |    |    |-- V: long (nullable = true)

我试着写这个数据框

newDF.write.mode("overwrite").parquet(path + "newDF.parquet")
 类似资料:
  • 我想将多个结构列合并成一个数组。 我从..尝试了数组(col1,col2),但结果是数据类型不匹配,即使所有结构列都是相同的类型。 查询- < code > select array(struct(f_name _ add,True as is_data_found),struct(l_name_add,True as is_data_found))作为标记from (select array(m

  • 问题内容: 我有以下数据 我得到一个 到目前为止,很好,我有一个数据结构,可以按列名进行寻址 下一个步骤,问题- 我有一个功能,在输入有地理坐标(的两个向量和当然的),并返回两个阵列和在地图上突出位置(此工程确定)。 我可以使用单独的向量,但是我想添加两个新列和。我的天真尝试 提出了一个,教我说它具有字典的某些特征,但是字典却没有。 我可以做吗?tia 请考虑这不适用于结构化数组或记录数组,大多数

  • 我有一个结构数组 我希望合并并按升序排序数组。然而,当我执行合并时,没有任何变化。这是我用来创建struct数组的代码,以及MergeSort的函数调用。最大用户数是我从二叉树中转换节点数得到的整数,它应该是数组的最大数量。 任何提示或提示都将不胜感激! 编辑:当我尝试编写一些printf语句时,我注意到这些值是负数。但是存储在结构中的值是正数。这个错误的原因是什么?

  • 问题内容: 将字段添加到结构化numpy数组的最干净方法是什么?可以破坏性地完成它,还是必须创建一个新数组并在现有字段上进行复制?每个字段的内容是否连续存储在内存中,以便可以高效地进行复制? 问题答案: 如果您使用的是numpy 1.3,则还有numpy.lib.recfunctions.append_fields()。 对于许多安装,您将需要访问它。不允许一个人看到

  • 问题内容: CREATE TABLE logistics ( id int primary key, campaign VARCHAR(255), event_type VARCHAR (255), date_offered VARCHAR (255), date_ordered DATE, date_delivered DATE, date_recorded DATE, date_complet

  • 元组和列表十分类似,只不过元组和字符串一样是 不可变的 即你不能修改元组。元组通过圆括号中用逗号分割的项目定义。元组通常用在使语句或用户定义的函数能够安全地采用一组值的时候,即被使用的元组的值不会改变。 使用元组 例9.2 使用元组 #!/usr/bin/python # Filename: using_tuple.py zoo = ('wolf','elephant','penguin') pr