当前位置: 首页 > 知识库问答 >
问题:

PySpark 连接两个数据帧并更新嵌套结构

衡高寒
2023-03-14

我有两个火花数据帧。我想基于第二个更新第一个。

数据流1

[
  {"sku": "abc-abc-abc", 
   "prod_id": "ss-23235", 
   "salePrice": "2312"
  }, {
   "sku": "xyz-xyz-xyz", 
   "prod_id": "ss-13265", 
   "salePrice": "8312"
  }
]

df2

[{
  "sku": "abc-abc-abc", 
  "min_price": "678"
 },{
  "sku": "xyz-xyz-xyz", 
  "min_price": "7655"
 }
]

我想更新第一个数据帧(df1),如下所示

[
 {"sku": "abc-abc-abc", 
  "prod_id": "ss-23235", 
  "price": {
     "salePrice": "2312",
     "min_price": "678"
   },
 },
 {"sku": "xyz-xyz-xyz", 
  "prod_id": "ss-13265", 
  "price": {
     "salePrice": "8312",
      "min_price": "7655"
   }
 }
]

我不知道如何加入嵌套结构中的数据

共有1个答案

江永安
2023-03-14
import json
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
from pyspark.sql import functions as F

Source1 = [
  {"sku": "abc-abc-abc", 
   "prod_id": "ss-23235", 
   "salePrice": "2312"
  }, {
   "sku": "xyz-xyz-xyz", 
   "prod_id": "ss-13265", 
   "salePrice": "8312"
  }
]
Source2 = [{
  "sku": "abc-abc-abc", 
  "min_price": "678"
 },{
  "sku": "xyz-xyz-xyz", 
  "min_price": "7655"
 }
]

df1 = sqlContext.read.json(sc.parallelize(Source1))
df1.show()
df2 = sqlContext.read.json(sc.parallelize(Source2))
df2.show()
df3 = df1.join(df2,'sku',how='inner').select(df1.sku,df1.prod_id,df1.salePrice,df2.min_price).withColumn('price',F.to_json(F.struct(df1.salePrice,df2.min_price))).drop('salePrice').drop('min_price')
df4 = df3.select('sku','prod_id','price').withColumn('Output',F.to_json(F.struct('sku','prod_id','price'))).drop('sku').drop('prod_id').drop('price').show(truncate=False)

---------------------------------------------------------------------------------------------------- |输出 | ---------------------------------------------------------------------------------------------------- |{"s ku":"abc-abc-abc","prod_id":"s-23235","价格":"{"销售价格":"2312","min_price":"678"}"}||{"sku":"xyz-xyz-xyz","prod_id":"ss-13265","价格":"{"salePrice":"8312","min_price":"7655"}"}| ----------------------------------------------------------------------------------------------------

 类似资料:
  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确

  • 我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1

  • 我有两个数据帧需要以我正在努力的特定方式连接。 数据帧 1: 数据框2: 期望结果: 基本上-它应该在上连接df1和df2,但是如果df2中不存在,那么生成的应该是df1中的。 我尝试了 ),但这显然在列中留下了 以供 ,因为它在 df2 中没有匹配的域。对于此特定情况,我如何将它们添加到列中?

  • 我有两个数据帧和包含IP地址,我正在尝试将IP地址映射到地理位置信息,如经度和纬度,它们是中的列。 我运行它作为一个火花提交作业,但操作花了很长时间,即使只有不到2500行。 我的代码: 有没有其他方法可以加入这两张桌子?还是我做错了?

  • 我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘