我有一个数据框架,它有一行和几列。有些列是单个值,有些是列表。所有列表列的长度相同。我想将每个列表列拆分为单独的行,同时保留任何非列表列。
样本DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
我想要什么:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
如果我只有一个列表列,那么只需执行< code>explode就很容易了:
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
然而,如果我尝试分解<code>c</code>列,我得到的数据帧长度为我想要的平方:
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
我想要的是-对于每一列,取该列中数组的第n个元素,并将其添加到新行。我尝试过将爆炸映射到数据帧中的所有列,但似乎也不起作用:
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
df.withColumn("bc", arrays_zip("b","c"))
.select("a", explode("bc").alias("tbc"))
.select("a", col"tbc.b", "tbc.c").show()
< code >从pyspark.sql.functions导入arrays_zip
b
列和 c
列的array_zip
bc
以获得结构 tbc
列 a
、b
和 c
(全部根据需要分解)。> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 1| 7|
| 1| 2| 8|
| 1| 3| 9|
+---+---+---+
您需要使用 flatMap
,而不是映射
,因为您希望从每个输入行中生成多个输出行。
from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop('b')
cList = rowDict.pop('c')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict['b'] = b
newDict['c'] = c
yield Row(**newDict)
df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
火花
您可以将zip_
udf
替换为arrays_zip
函数
from pyspark.sql.functions import arrays_zip, col, explode
(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
火花
使用< code >数据帧和UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
使用RDD
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
由于 Python 通信开销,这两种解决方案都效率低下。如果数据大小是固定的,您可以执行如下操作:
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
甚至:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
与UDF或RDD相比,这应该要快得多。泛化以支持任意数量的列:
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
我正在Spark 3.0.0上执行Spark结构流的示例,为此,我使用了twitter数据。我在Kafka中推送了twitter数据,单个记录如下所示 2020-07-21 10:48:19|1265200268284588034|RT@narendramodi:与@IBM首席执行官@ArvindKrishna先生进行了广泛的互动。我们讨论了几个与技术相关的主题,…|印度海得拉巴 在这里,每个字段
我的问题是如何将一列拆分为多个列。我不知道为什么 不起作用。 例如,我想将“df_test”更改为“df_test2”。我看到了很多使用熊猫模块的例子。还有别的办法吗?提前感谢您。 df_test2
我有: 我想要: 似乎在scala中我可以写:< code>df.select($"value。_1 ",$ "值。_2 ",$ "值。_3"),但这在python中是不可能的。 那么有没有好的办法呢?
我有一个pyspark dataframe,其中一列的格式如下: [{key1:value1},{key2:value2},{key3:value3},{key4:value4}] 让我们把它说成下面的专栏: 我希望将其转换为dataframe的列,其中列名为keyX,其内容为valueX,其中x=[1,4],如下所示: 我试过一些解决办法,但都不起作用。请求你分享任何想法或解决方案,如果你有。提
问题内容: 我有一个具有这种结构的表。 我无法弄清楚我将使用哪种SQL查询来获得这样的结果集: 我正在尝试将三列分为三个单独的行。这可能吗? 问题答案: SELECT Y.UserID, Y.UserName, QuestionName = ‘AnswerToQuestion’ + X.Which, Response = CASE X.Which WHEN ‘1’ THEN AnswerToQue
我有一个火花数据框如下,并希望分裂成3的空间列。 下面是预期的结果。第一项留在text1列,第二项转到text2,其余的都转到text3(如果有)。原始列值可能包含空记录或带有任意数量分隔符的值,分隔符是空格。 提前感谢!