$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}
用户有订单。
$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}
我喜欢加入它来得到这样一个结构,其中订单是数组嵌套在用户中。
$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
我怎么能那么做?是否有任何类型的嵌套联接或类似的东西?
>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
>>> order = sqlContext.read.json("order.json")
>>> order.printSchema();
root
|-- id: long (nullable = true)
|-- price: double (nullable = true)
|-- userid: long (nullable = true)
>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")
>>> lines = sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
|-- id: long (nullable = true)
|-- orderid: long (nullable = true)
|-- product: string (nullable = true)
orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
| | |-- lines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _1: long (nullable = true)
| | | | |-- _2: long (nullable = true)
| | | | |-- _3: string (nullable = true)
from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema
grouped = lines.rdd.groupBy(lambda r: r.orderid)
grouped = grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)
TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>
这只适用于Spark2.0或更高版本
首先,我们需要几个进口货:
from pyspark.sql.functions import struct, collect_list
剩下的是一个简单的聚合和联接:
orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")
combined = users.join(
orders
.groupBy("userId")
.agg(collect_list(struct(*orders.columns)).alias("orders"))
.withColumnRenamed("userId", "id"), ["id"])
combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders |
+---+-----+---------------------------+
|1 |UserA|[[1,202.3,1], [2,343.99,1]]|
|2 |UserB|[[3,399.99,2]] |
+---+-----+---------------------------+
combined.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- price: double (nullable = true)
| | |-- userid: long (nullable = true)
for x in combined.toJSON().collect():
print(x)
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
假设我有一张这样的桌子: 它以拼花地板的形式存储。我需要在spark中读取表,在“field1”上执行groupBy,然后我需要在ES中存储一个嵌套字段(例如,称为“agg\u字段”),其中包含一个字典列表,其中包含字段2和字段3的值,这样文档将如下所示: 我可以阅读表格并进行分组: 我可以做一些聚合并将结果发送给es: 但我不知道如何将聚合更改为嵌套的“agg\u fields”列,该列将被el
问题内容: 我已经阅读了从平面csv创建嵌套JSON的内容,但对我而言没有帮助。 我有一个很大的电子表格,它是由Google文档创建的,包含11行和74列(某些列未占用)。 我在Google云端硬盘上创建了一个示例。导出为a时,它看起来像这样: 现在,我想要一个结构,如下所示: 以此类推。 我的理论方法是逐行遍历文件(这是第一个问题:现在每一行等于一行,但有时是几行,因此我需要计算逗号?)。每行等
下面的代码是我的适配器 下面的代码是我的片段,带有虚拟arraylist数据
问题内容: 说我要制作以下JSON 目前,这就是我实现的方式 似乎应该有一个更好的方法来做到这一点,而不是做更多的事情,然后将它们放置在主体中。有比我正在使用的方法更好的动态构建方法吗? 在此先感谢您的帮助! 问题答案: 如果您处理大量JSON数据,我真的建议您使用Gson或Jackson。这要方便得多,并且两个库都很好地支持与Java对象之间的相互转换(与通过JSONObjects手动构建JSO
问题内容: 我正在尝试从深度嵌套的JSON字符串创建单个Pandas DataFrame对象。 JSON模式是: 期望的结果 我需要将其展平以产生一张桌子: 第一列是值,其余列是键的值并存储在列表中。 到目前为止,我已经 是一个列表,其中长度等于个人数量,即。df对象只是返回 如何遍历该列表以获取dict值并创建N个不同的列?我应该尝试为该列表创建一个DataFrame ,重塑它的形状,然后用角色
============更新========== 我在我的JSON中添加了一些更多的细节(struct_c和array_d)以使它更清楚地知道我在哪里得到了异常。 =============================================== 我有一个带有Struct类型嵌套数组的Spark DataFrame。我想从该结构中选择一个列,但收到错误消息:“org.apache.sp
谁能告诉我我错过了什么吗?我正在尝试使用spring boot mongodb创建一个Mongo集合。 我想创造这样的东西 但我要得到这个 我的领域类是 我的存储库是 控制器是 我的jsp表格是
了解如何在 Dreamweaver 中创建用于控制共享设计元素的页面中的内容的嵌套模板。 嵌套模板是指其设计和可编辑区域都基于另一个模板的模板。嵌套模板对于控制共享许多设计元素的站点页面中的内容很有用,但在各页之间有些差异。例如,基本模板中可能包含更宽广的设计区域,并且可以由站点的许多内容提供者使用,而嵌套模板可能进一步定义站点内特定部分的页面中的可编辑区域。 基本模板中的可编辑区域被传递到嵌套模