当前位置: 首页 > 知识库问答 >
问题:

Pyspark:将数据帧作为数组类型列连接到另一个数据帧

陆弘光
2023-03-14

我正在尝试在 pyspark 中连接两个数据帧,但将一个表作为数组列连接在另一个表上。

例如,对于这些表:

from pyspark.sql import Row

df1 = spark.createDataFrame([
    Row(a = 1, b = 'C', c = 26, d = 'abc'),
    Row(a = 1, b = 'C', c = 27, d = 'def'),
    Row(a = 1, b = 'D', c = 51, d = 'ghi'),
    Row(a = 2, b = 'C', c = 40, d = 'abc'),
    Row(a = 2, b = 'D', c = 45, d = 'abc'),
    Row(a = 2, b = 'D', c = 38, d = 'def')
])

df2 = spark.createDataFrame([
    Row(a = 1, b = 'C', e = 2, f = 'cba'),
    Row(a = 1, b = 'D', e = 3, f = 'ihg'),
    Row(a = 2, b = 'C', e = 7, f = 'cba'),
    Row(a = 2, b = 'D', e = 9, f = 'cba')
])

我想在列ab上将df1连接到df2,但df1. cdf1. d应该是单个数组类型列。此外,应保留所有名称。新数据框的输出应该能够转换为此json结构(例如前两行):

{
    "a": 1,
    "b": "C",
    "e": 2,
    "f": "cba",
    "df1": [
            {
                "c": 26,
                "d": "abc"
            },
            {
                "c": 27,
                "d": "def"
            } 
           ]
}

任何关于如何实现这一目标的想法都将不胜感激!

谢谢,

卡罗莱纳州

共有1个答案

傅和璧
2023-03-14

根据您输入的样本数据:

from pyspark.sql import functions as F


df1 = df1.groupBy("a", "b").agg(
    F.collect_list(F.struct(F.col("c"), F.col("d"))).alias("df1")
)

df1.show()
+---+---+--------------------+
|  a|  b|                 df1|
+---+---+--------------------+
|  1|  C|[[26, abc], [27, ...|
|  1|  D|         [[51, ghi]]|
|  2|  D|[[45, abc], [38, ...|
|  2|  C|         [[40, abc]]|
+---+---+--------------------+
df3 = df1.join(df2, on=["a", "b"])

df3.show()
+---+---+--------------------+---+---+
|  a|  b|                 df1|  e|  f|
+---+---+--------------------+---+---+
|  1|  C|[[26, abc], [27, ...|  2|cba|
|  1|  D|         [[51, ghi]]|  3|ihg|
|  2|  D|[[45, abc], [38, ...|  9|cba|
|  2|  C|         [[40, abc]]|  7|cba|
+---+---+--------------------+---+---+
 类似资料:
  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有两个数据帧df1和df2 df1如下 df2就像 我想根据df2中与df1中的列名匹配的单元格值将值从df1复制到df2,所以我的df3应该看起来像 df3 基本上,我想根据df2的单元格值(df1中的列名)从df1复制df2中的列 如果它仍然令人困惑,请告诉我

  • 我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1

  • 我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。

  • 我有两个数据帧df1和df2。df1就像一个具有以下值的字典 df2具有以下值: 我想基于df1数据帧中的,将df2拆分为3个新的数据帧。 日期,TLRA_权益栏应位于数据框 预期产出: > 数据帧 消费者,非周期性数据帧 请让我知道如何有效地做。我想做的是连接列名,例如,然后根据列名的前半部分分割数据帧。 代码: 但这很复杂。需要更好的解决方案。

  • 情况: 两个数据帧(df1和df2)具有相同的三个索引,例如“A”、“B”、“C”。df1和df2的列数不同。df1和df2中的所有单元格都填充了float类型的数据。 DF1: DF2: 目标: 从df2中选择的列(例如“BBB”)与df1的每列相加后,结果应存储在新的数据帧(df_new)中。df_new的格式应为df1(列数和行数),并具有与df1相同的列名和索引。 new_df: 我的做法