当前位置: 首页 > 知识库问答 >
问题:

如何在PySpark中一次替换多列中的值,如果它们都为空?

空英逸
2023-03-14

给定数据集

如果其中一个或两个都为空,则如何将foobar同时替换为其他值,如(5,6)?

这适用于地理数据集,当纬度/lng 未知且应在其他地方获得时。所以udf很耗时,我想确保它只为必要的行调用(其中foo和bar都是空的)

下面的代码

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, udf

spark = (SparkSession.builder.master("local")
         .appName("SimpleApp")
         .getOrCreate()
         )


def my_udf(): return 0, 0


df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df = df.withColumn("result", when(df['foo'].isNull() | df['bar'].isNull(), udf(my_udf)()))

df.show()

是不好的尝试

因此,有必要以某种方式将数组解压缩到列中。

考虑到这一点,它不能一步到位地完成 Apache Spark - 将UDF的结果分配给多个数据帧列

但是,即使我将返回结构并将其解压,如何离开不受影响的列?

我尝试过的另一种方法(考虑到我需要进一步处理foobar):

def some(baz): return 'some'
def something_else(foo, bar): return 'something'
def my_udf(_):
    foo, bar, baz = _
    return some(baz) if foo is None and bar is None else something_else(foo, bar)


df = spark.createDataFrame([[1, 2, 3], [None, None, 4], [3, 4, 5]], schema=['foo', 'bar', 'baz'])
df = df.withColumn("result", udf(my_udf)(array('foo', 'bar', 'baz')))

df.show()

但是我觉得这不是最理想的,即使我们不需要< code>baz用于大多数行,我们仍然把它传递给udf,我认为这将阻止请求的最佳化。

当然,我可以一个接一个地为不同的列应用不同的udfs,但这似乎也不是最佳的。

那么有没有办法同时替换两列中的值呢?

共有2个答案

沈博达
2023-03-14

最后得出了以下使用中间struct列的解决方案。不确定它是最佳的和可由spark优化的,但它似乎是。

任务是将 foo 栏 null 替换为基于 baz 的耗时内容(在本例中只是 baz/2,但它可以是 rest 请求 i.g)

original
+----+----+---+
| foo| bar|baz|
+----+----+---+
| 1.0| 2.0|3.0|
|null|null|4.0|
| 3.0| 4.0|5.0|
+----+----+---+

transformed
+---+---+---+
|foo|bar|baz|
+---+---+---+
|1.0|2.0|3.0|
|2.0|2.0|4.0|
|3.0|4.0|5.0|
+---+---+---+

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, struct, when
from pyspark.sql.types import StructType, StructField, FloatType

spark = (SparkSession.builder.master("local")
         .appName("SimpleApp")
         .getOrCreate()
         )

df = spark.createDataFrame([[1.0, 2.0, 3.0], [None, None, 4.0], [3.0, 4.0, 5.0]], schema=['foo', 'bar', 'baz'])

df.show()

schema = StructType([
    StructField("foo", FloatType(), True),
    StructField("bar", FloatType(), True)
])


def f(foo, bar, baz):
    # prevent time consuming op if optimizer call this udf on unecessary rows
    # https://stackoverflow.com/questions/49634651/using-udf-ignores-condition-in-when
    if foo is not None and bar is not None: return foo, bar

    return baz / 2, baz / 2  # imagine this is time consuming, i.g. rest get


udf_f = udf(f, schema)

df = df.withColumn(
    "result",
    when(
        df.foo.isNull() | df.bar.isNull(),
        udf_f(df.foo, df.bar, df.baz)
    ).otherwise(
        struct(df.foo, df.bar)
    )
).select("result.*", df.baz)

df.show()


雍兴修
2023-03-14

我会坚持你的第一个想法,然后使用合并来填充udf结果中的空行:

from pyspark.sql import functions as F
from pyspark.sql import types as T

@F.udf(returnType=T.StructType([T.StructField("foo",T.DoubleType(), True), 
                  T.StructField("bar",T.DoubleType(), True)]))
def my_udf(foo, bar): 
    return (0.0, 1.0)


df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df.withColumn("result", F.when(df['foo'].isNull() | df['bar'].isNull(), 
                                                    my_udf("foo", "bar"))) \
    .withColumn("foo", F.coalesce("foo", "result.foo")) \
    .withColumn("bar", F.coalesce("bar", "result.bar")) \
    .show()

输出:

+---+---+----------+                                                            
|foo|bar|    result|
+---+---+----------+
|1.0|2.0|      null|
|0.0|1.0|{0.0, 1.0}|
|3.0|4.0|      null|
+---+---+----------+

使用<code>coalesce</code>不会造成性能问题,因为该函数是一个狭窄的转换,因此不会导致混洗。

 类似资料:
  • 我想将数据帧列中的一个值替换为另一个值,我必须对许多列执行此操作(假设30/100列) 我已经经历过这个和这个了。 我可以在y列和z列中分别用Null替换“baz”。但我想对所有列都这样做——类似于下面的列表理解方式

  • 我有一个列中所有值都不同的列表,我需要用1替换该列表中不存在的所有值 这是我在python中所做的工作:

  • 问题内容: 我正在使用Runnable每秒自动从玩家的冷却时间中减去20,但是我不知道如何在迭代过程中替换值。如何更新每个键的值? 问题答案: 使用Java 8: 使用Java 7或更旧版本: 您可以迭代条目并更新值,如下所示:

  • 问题内容: 我想用相邻列中的值替换一列中的空值,例如,如果我有 我希望它是: 尝试过 但是没用,它说值应该是浮点数,整数,长整数,字符串或字典 有任何想法吗? 问题答案: 最后找到一个替代方案:

  • 我想用相邻列中的值替换一列中的空值,例如,如果我 我希望它是: 尝试过 但没有工作,它说值应该是浮点数、int、长、字符串或判决 有什么想法吗?