当前位置: 首页 > 知识库问答 >
问题:

PySpark一次替换几列中的值

督烨赫
2023-03-14

我想将数据帧列中的一个值替换为另一个值,我必须对许多列执行此操作(假设30/100列)

我已经经历过这个和这个了。

from pyspark.sql.functions import when, lit, col

df = sc.parallelize([(1, "foo", "val"), (2, "bar", "baz"), (3, "baz", "buz")]).toDF(["x", "y", "z"])
df.show()

# I can replace "baz" with Null separaely in column y and z
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

df = df.withColumn("y", replace(col("y"), "baz"))\
    .withColumn("z", replace(col("z"), "baz"))
df.show()    

我可以在y列和z列中分别用Null替换“baz”。但我想对所有列都这样做——类似于下面的列表理解方式

[replace(df[col], "baz") for col in df.columns]

共有3个答案

施英哲
2023-03-14

您可以使用<code>选择<code>和列表理解:

df = df.select([replace(f.col(column), 'baz').alias(column) if column!='x' else f.col(column)
                for column in df.columns])
df.show()
艾浩穰
2023-03-14

使用减少()函数:

from functools import reduce

reduce(lambda d, c: d.withColumn(c, replace(col(c), "baz")), [df, 'y', 'z']).show()
#+---+----+----+
#|  x|   y|   z|
#+---+----+----+
#|  1| foo| val|
#|  2| bar|null|
#|  3|null| buz|
#+---+----+----+
丰辰沛
2023-03-14

因为有大约30/100列,所以让我们在< code>DataFrame中再添加几列,以便更好地概括它。

# Loading the requisite packages
from pyspark.sql.functions import col, when
df = sc.parallelize([(1,"foo","val","baz","gun","can","baz","buz","oof"), 
                     (2,"bar","baz","baz","baz","got","pet","stu","got"), 
                     (3,"baz","buz","pun","iam","you","omg","sic","baz")]).toDF(["x","y","z","a","b","c","d","e","f"])
df.show()
+---+---+---+---+---+---+---+---+---+ 
|  x|  y|  z|  a|  b|  c|  d|  e|  f| 
+---+---+---+---+---+---+---+---+---+ 
|  1|foo|val|baz|gun|can|baz|buz|oof| 
|  2|bar|baz|baz|baz|got|pet|stu|got| 
|  3|baz|buz|pun|iam|you|omg|sic|baz| 
+---+---+---+---+---+---+---+---+---+

假设我们想在除列xa之外的所有列中用Null替换baz。使用列表理解选择必须执行替换的那些列。

# This contains the list of columns where we apply replace() function
all_column_names = df.columns
print(all_column_names)
    ['x', 'y', 'z', 'a', 'b', 'c', 'd', 'e', 'f']
columns_to_remove = ['x','a']
columns_for_replacement = [i for i in all_column_names if i not in columns_to_remove]
print(columns_for_replacement)
    ['y', 'z', 'b', 'c', 'd', 'e', 'f']

最后,使用 when() 进行替换,这实际上是 if 子句的假名。

# Doing the replacement on all the requisite columns
for i in columns_for_replacement:
    df = df.withColumn(i,when((col(i)=='baz'),None).otherwise(col(i)))
df.show()
+---+----+----+---+----+---+----+---+----+ 
|  x|   y|   z|  a|   b|  c|   d|  e|   f| 
+---+----+----+---+----+---+----+---+----+ 
|  1| foo| val|baz| gun|can|null|buz| oof| 
|  2| bar|null|baz|null|got| pet|stu| got| 
|  3|null| buz|pun| iam|you| omg|sic|null| 
+---+----+----+---+----+---+----+---+----+

如果可以用普通的< code>if-else子句完成替换,则无需创建< code>UDF和定义函数来完成替换。< code>UDF通常是一种成本高昂的操作,应尽可能避免。

 类似资料:
  • 问题内容: 我想用相邻列中的值替换一列中的空值,例如,如果我有 我希望它是: 尝试过 但是没用,它说值应该是浮点数,整数,长整数,字符串或字典 有任何想法吗? 问题答案: 最后找到一个替代方案:

  • 给定数据集 如果其中一个或两个都为空,则如何将同时替换为其他值,如(5,6)? 这适用于地理数据集,当纬度/lng 未知且应在其他地方获得时。所以udf很耗时,我想确保它只为必要的行调用(其中foo和bar都是空的) 下面的代码 是不好的尝试 因此,有必要以某种方式将数组解压缩到列中。 考虑到这一点,它不能一步到位地完成 Apache Spark - 将UDF的结果分配给多个数据帧列 但是,即使我

  • 我想用相邻列中的值替换一列中的空值,例如,如果我 我希望它是: 尝试过 但没有工作,它说值应该是浮点数、int、长、字符串或判决 有什么想法吗?

  • 问题内容: 我想通过替换子字符串对Spark Dataframe列执行一些基本的处理。最快的方法是什么? 在当前的用例中,我有一个要规范化的地址列表。例如,此数据框: 会成为 问题答案: 对于Spark 1.5或更高版本,可以使用功能包: 快速说明: 调用该函数可在数据框中添加(或替换,如果名称存在)列。 该函数将通过替换所有与模式匹配的子字符串来生成新列。

  • 我正在运行一个AWS Glue作业,使用从Glue自动生成的PySpark脚本,将S3上的管道分隔文件加载到RDS Postgres实例中。 最初,它抱怨某些列中的空值: http://spark.apache.org/docs/latest/api/python/pyspark.sql.sql.html#pyspark.sql.dataframe.fillna 现在,当我运行作业时,它会抛出以下

  • 编写此自定义项是为了用变量替换列的值。Python 2.7;Spark 2.2.0 变量L_1到L_3更新了每行的列。我这样称呼它: 错误是: