当前位置: 首页 > 知识库问答 >
问题:

创建火花的嵌套DataFrame

马泰
2023-03-14
$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}

用户有订单。

$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

我喜欢加入它来得到这样一个结构,其中订单是数组嵌套在用户中。

$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

我怎么能那么做?是否有任何类型的嵌套联接或类似的东西?

>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> order =  sqlContext.read.json("order.json")
>>> order.printSchema();
root
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- userid: long (nullable = true)

>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")
>>> lines =  sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
 |-- id: long (nullable = true)
 |-- orderid: long (nullable = true)
 |-- product: string (nullable = true)

orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
 |    |    |-- lines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _1: long (nullable = true)
 |    |    |    |    |-- _2: long (nullable = true)
 |    |    |    |    |-- _3: string (nullable = true)
from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema

grouped =  lines.rdd.groupBy(lambda r: r.orderid)
grouped =  grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)
TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>

共有1个答案

华誉
2023-03-14

这只适用于Spark2.0或更高版本

首先,我们需要几个进口货:

from pyspark.sql.functions import struct, collect_list

剩下的是一个简单的聚合和联接:

orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")

combined = users.join(
    orders
        .groupBy("userId")
        .agg(collect_list(struct(*orders.columns)).alias("orders"))
        .withColumnRenamed("userId", "id"), ["id"])
combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders                     |
+---+-----+---------------------------+
|1  |UserA|[[1,202.3,1], [2,343.99,1]]|
|2  |UserB|[[3,399.99,2]]             |
+---+-----+---------------------------+
combined.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
for x in combined.toJSON().collect():
    print(x)     
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
 类似资料:
  • 假设我有一张这样的桌子: 它以拼花地板的形式存储。我需要在spark中读取表,在“field1”上执行groupBy,然后我需要在ES中存储一个嵌套字段(例如,称为“agg\u字段”),其中包含一个字典列表,其中包含字段2和字段3的值,这样文档将如下所示: 我可以阅读表格并进行分组: 我可以做一些聚合并将结果发送给es: 但我不知道如何将聚合更改为嵌套的“agg\u fields”列,该列将被el

  • 问题内容: 我已经阅读了从平面csv创建嵌套JSON的内容,但对我而言没有帮助。 我有一个很大的电子表格,它是由Google文档创建的,包含11行和74列(某些列未占用)。 我在Google云端硬盘上创建了一个示例。导出为a时,它看起来像这样: 现在,我想要一个结构,如下所示: 以此类推。 我的理论方法是逐行遍历文件(这是第一个问题:现在每一行等于一行,但有时是几行,因此我需要计算逗号?)。每行等

  • 下面的代码是我的适配器 下面的代码是我的片段,带有虚拟arraylist数据

  • 问题内容: 说我要制作以下JSON 目前,这就是我实现的方式 似乎应该有一个更好的方法来做到这一点,而不是做更多的事情,然后将它们放置在主体中。有比我正在使用的方法更好的动态构建方法吗? 在此先感谢您的帮助! 问题答案: 如果您处理大量JSON数据,我真的建议您使用Gson或Jackson。这要方便得多,并且两个库都很好地支持与Java对象之间的相互转换(与通过JSONObjects手动构建JSO

  • 问题内容: 我正在尝试从深度嵌套的JSON字符串创建单个Pandas DataFrame对象。 JSON模式是: 期望的结果 我需要将其展平以产生一张桌子: 第一列是值,其余列是键的值并存储在列表中。 到目前为止,我已经 是一个列表,其中长度等于个人数量,即。df对象只是返回 如何遍历该列表以获取dict值并创建N个不同的列?我应该尝试为该列表创建一个DataFrame ,重塑它的形状,然后用角色

  • ============更新========== 我在我的JSON中添加了一些更多的细节(struct_c和array_d)以使它更清楚地知道我在哪里得到了异常。 =============================================== 我有一个带有Struct类型嵌套数组的Spark DataFrame。我想从该结构中选择一个列,但收到错误消息:“org.apache.sp

  • 谁能告诉我我错过了什么吗?我正在尝试使用spring boot mongodb创建一个Mongo集合。 我想创造这样的东西 但我要得到这个 我的领域类是 我的存储库是 控制器是 我的jsp表格是

  • 了解如何在 Dreamweaver 中创建用于控制共享设计元素的页面中的内容的嵌套模板。 嵌套模板是指其设计和可编辑区域都基于另一个模板的模板。嵌套模板对于控制共享许多设计元素的站点页面中的内容很有用,但在各页之间有些差异。例如,基本模板中可能包含更宽广的设计区域,并且可以由站点的许多内容提供者使用,而嵌套模板可能进一步定义站点内特定部分的页面中的可编辑区域。 基本模板中的可编辑区域被传递到嵌套模