当前位置: 首页 > 知识库问答 >
问题:

Pyspark:将多个数组列拆分为行

关飞翔
2023-03-14

我有一个数据框架,它有一行和几列。有些列是单个值,有些是列表。所有列表列的长度相同。我想将每个列表列拆分为单独的行,同时保留任何非列表列。

样本DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

我想要什么:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

如果我只有一个列表列,那么只需执行< code>explode就很容易了:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

然而,如果我尝试分解<code>c</code>列,我得到的数据帧长度为我想要的平方:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

我想要的是-对于每一列,取该列中数组的第n个元素,并将其添加到新行。我尝试过将爆炸映射到数据帧中的所有列,但似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

共有3个答案

戴靖
2023-03-14
df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()

< code >从pyspark.sql.functions导入arrays_zip

  1. 创建一列 bc,它是 b 列和 c 列的array_zip
  2. 分解 bc 以获得结构 tbc
  3. 选择所需的列 abc(全部根据需要分解)。
> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+
隆飞宇
2023-03-14

您需要使用 flatMap,而不是映射,因为您希望从每个输入行中生成多个输出行。

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
东方宜
2023-03-14

火花

您可以将zip_ udf 替换为arrays_zip函数

from pyspark.sql.functions import arrays_zip, col, explode

(df
    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

火花

使用< code >数据帧和UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)

(df
    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

使用RDD

(df
    .rdd
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

由于 Python 通信开销,这两种解决方案都效率低下。如果数据大小是固定的,您可以执行如下操作:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

甚至:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)
]))

(df
    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

与UDF或RDD相比,这应该要快得多。泛化以支持任意数量的列:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)
    ]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))
 类似资料:
  • 我正在Spark 3.0.0上执行Spark结构流的示例,为此,我使用了twitter数据。我在Kafka中推送了twitter数据,单个记录如下所示 2020-07-21 10:48:19|1265200268284588034|RT@narendramodi:与@IBM首席执行官@ArvindKrishna先生进行了广泛的互动。我们讨论了几个与技术相关的主题,…|印度海得拉巴 在这里,每个字段

  • 我的问题是如何将一列拆分为多个列。我不知道为什么 不起作用。 例如,我想将“df_test”更改为“df_test2”。我看到了很多使用熊猫模块的例子。还有别的办法吗?提前感谢您。 df_test2

  • 我有: 我想要: 似乎在scala中我可以写:< code>df.select($"value。_1 ",$ "值。_2 ",$ "值。_3"),但这在python中是不可能的。 那么有没有好的办法呢?

  • 我有一个pyspark dataframe,其中一列的格式如下: [{key1:value1},{key2:value2},{key3:value3},{key4:value4}] 让我们把它说成下面的专栏: 我希望将其转换为dataframe的列,其中列名为keyX,其内容为valueX,其中x=[1,4],如下所示: 我试过一些解决办法,但都不起作用。请求你分享任何想法或解决方案,如果你有。提

  • 问题内容: 我有一个具有这种结构的表。 我无法弄清楚我将使用哪种SQL查询来获得这样的结果集: 我正在尝试将三列分为三个单独的行。这可能吗? 问题答案: SELECT Y.UserID, Y.UserName, QuestionName = ‘AnswerToQuestion’ + X.Which, Response = CASE X.Which WHEN ‘1’ THEN AnswerToQue

  • 我有一个火花数据框如下,并希望分裂成3的空间列。 下面是预期的结果。第一项留在text1列,第二项转到text2,其余的都转到text3(如果有)。原始列值可能包含空记录或带有任意数量分隔符的值,分隔符是空格。 提前感谢!