我有两个火花数据帧。我想基于第二个更新第一个。
数据流1
[
{"sku": "abc-abc-abc",
"prod_id": "ss-23235",
"salePrice": "2312"
}, {
"sku": "xyz-xyz-xyz",
"prod_id": "ss-13265",
"salePrice": "8312"
}
]
df2
[{
"sku": "abc-abc-abc",
"min_price": "678"
},{
"sku": "xyz-xyz-xyz",
"min_price": "7655"
}
]
我想更新第一个数据帧(df1),如下所示
[
{"sku": "abc-abc-abc",
"prod_id": "ss-23235",
"price": {
"salePrice": "2312",
"min_price": "678"
},
},
{"sku": "xyz-xyz-xyz",
"prod_id": "ss-13265",
"price": {
"salePrice": "8312",
"min_price": "7655"
}
}
]
我不知道如何加入嵌套结构中的数据
import json
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
from pyspark.sql import functions as F
Source1 = [
{"sku": "abc-abc-abc",
"prod_id": "ss-23235",
"salePrice": "2312"
}, {
"sku": "xyz-xyz-xyz",
"prod_id": "ss-13265",
"salePrice": "8312"
}
]
Source2 = [{
"sku": "abc-abc-abc",
"min_price": "678"
},{
"sku": "xyz-xyz-xyz",
"min_price": "7655"
}
]
df1 = sqlContext.read.json(sc.parallelize(Source1))
df1.show()
df2 = sqlContext.read.json(sc.parallelize(Source2))
df2.show()
df3 = df1.join(df2,'sku',how='inner').select(df1.sku,df1.prod_id,df1.salePrice,df2.min_price).withColumn('price',F.to_json(F.struct(df1.salePrice,df2.min_price))).drop('salePrice').drop('min_price')
df4 = df3.select('sku','prod_id','price').withColumn('Output',F.to_json(F.struct('sku','prod_id','price'))).drop('sku').drop('prod_id').drop('price').show(truncate=False)
---------------------------------------------------------------------------------------------------- |输出 | ---------------------------------------------------------------------------------------------------- |{"s ku":"abc-abc-abc","prod_id":"s-23235","价格":"{"销售价格":"2312","min_price":"678"}"}||{"sku":"xyz-xyz-xyz","prod_id":"ss-13265","价格":"{"salePrice":"8312","min_price":"7655"}"}| ----------------------------------------------------------------------------------------------------
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1
我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确
我有两个数据帧需要以我正在努力的特定方式连接。 数据帧 1: 数据框2: 期望结果: 基本上-它应该在上连接df1和df2,但是如果df2中不存在,那么生成的应该是df1中的。 我尝试了 ),但这显然在列中留下了 以供 ,因为它在 df2 中没有匹配的域。对于此特定情况,我如何将它们添加到列中?
我有两个数据帧和包含IP地址,我正在尝试将IP地址映射到地理位置信息,如经度和纬度,它们是中的列。 我运行它作为一个火花提交作业,但操作花了很长时间,即使只有不到2500行。 我的代码: 有没有其他方法可以加入这两张桌子?还是我做错了?
我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘