给定数据集
如果其中一个或两个都为空,则如何将foo
bar
同时替换为其他值,如(5,6)?
这适用于地理数据集,当纬度/lng 未知且应在其他地方获得时。所以udf很耗时,我想确保它只为必要的行调用(其中foo和bar都是空的)
下面的代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, udf
spark = (SparkSession.builder.master("local")
.appName("SimpleApp")
.getOrCreate()
)
def my_udf(): return 0, 0
df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df = df.withColumn("result", when(df['foo'].isNull() | df['bar'].isNull(), udf(my_udf)()))
df.show()
是不好的尝试
因此,有必要以某种方式将数组解压缩到列中。
考虑到这一点,它不能一步到位地完成 Apache Spark - 将UDF的结果分配给多个数据帧列
但是,即使我将返回结构并将其解压,如何离开不受影响的列?
我尝试过的另一种方法(考虑到我需要进一步处理foo
bar
):
def some(baz): return 'some'
def something_else(foo, bar): return 'something'
def my_udf(_):
foo, bar, baz = _
return some(baz) if foo is None and bar is None else something_else(foo, bar)
df = spark.createDataFrame([[1, 2, 3], [None, None, 4], [3, 4, 5]], schema=['foo', 'bar', 'baz'])
df = df.withColumn("result", udf(my_udf)(array('foo', 'bar', 'baz')))
df.show()
但是我觉得这不是最理想的,即使我们不需要< code>baz用于大多数行,我们仍然把它传递给udf,我认为这将阻止请求的最佳化。
当然,我可以一个接一个地为不同的列应用不同的udfs,但这似乎也不是最佳的。
那么有没有办法同时替换两列中的值呢?
最后得出了以下使用中间struct列的解决方案。不确定它是最佳的和可由spark优化的,但它似乎是。
任务是将 foo 栏 null 替换为基于 baz 的耗时内容(在本例中只是 baz/2,但它可以是 rest 请求 i.g)
original
+----+----+---+
| foo| bar|baz|
+----+----+---+
| 1.0| 2.0|3.0|
|null|null|4.0|
| 3.0| 4.0|5.0|
+----+----+---+
transformed
+---+---+---+
|foo|bar|baz|
+---+---+---+
|1.0|2.0|3.0|
|2.0|2.0|4.0|
|3.0|4.0|5.0|
+---+---+---+
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, struct, when
from pyspark.sql.types import StructType, StructField, FloatType
spark = (SparkSession.builder.master("local")
.appName("SimpleApp")
.getOrCreate()
)
df = spark.createDataFrame([[1.0, 2.0, 3.0], [None, None, 4.0], [3.0, 4.0, 5.0]], schema=['foo', 'bar', 'baz'])
df.show()
schema = StructType([
StructField("foo", FloatType(), True),
StructField("bar", FloatType(), True)
])
def f(foo, bar, baz):
# prevent time consuming op if optimizer call this udf on unecessary rows
# https://stackoverflow.com/questions/49634651/using-udf-ignores-condition-in-when
if foo is not None and bar is not None: return foo, bar
return baz / 2, baz / 2 # imagine this is time consuming, i.g. rest get
udf_f = udf(f, schema)
df = df.withColumn(
"result",
when(
df.foo.isNull() | df.bar.isNull(),
udf_f(df.foo, df.bar, df.baz)
).otherwise(
struct(df.foo, df.bar)
)
).select("result.*", df.baz)
df.show()
我会坚持你的第一个想法,然后使用合并来填充udf结果中的空行:
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.udf(returnType=T.StructType([T.StructField("foo",T.DoubleType(), True),
T.StructField("bar",T.DoubleType(), True)]))
def my_udf(foo, bar):
return (0.0, 1.0)
df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df.withColumn("result", F.when(df['foo'].isNull() | df['bar'].isNull(),
my_udf("foo", "bar"))) \
.withColumn("foo", F.coalesce("foo", "result.foo")) \
.withColumn("bar", F.coalesce("bar", "result.bar")) \
.show()
输出:
+---+---+----------+
|foo|bar| result|
+---+---+----------+
|1.0|2.0| null|
|0.0|1.0|{0.0, 1.0}|
|3.0|4.0| null|
+---+---+----------+
使用<code>coalesce</code>不会造成性能问题,因为该函数是一个狭窄的转换,因此不会导致混洗。
我想将数据帧列中的一个值替换为另一个值,我必须对许多列执行此操作(假设30/100列) 我已经经历过这个和这个了。 我可以在y列和z列中分别用Null替换“baz”。但我想对所有列都这样做——类似于下面的列表理解方式
我有一个列中所有值都不同的列表,我需要用1替换该列表中不存在的所有值 这是我在python中所做的工作:
问题内容: 我正在使用Runnable每秒自动从玩家的冷却时间中减去20,但是我不知道如何在迭代过程中替换值。如何更新每个键的值? 问题答案: 使用Java 8: 使用Java 7或更旧版本: 您可以迭代条目并更新值,如下所示:
问题内容: 我想用相邻列中的值替换一列中的空值,例如,如果我有 我希望它是: 尝试过 但是没用,它说值应该是浮点数,整数,长整数,字符串或字典 有任何想法吗? 问题答案: 最后找到一个替代方案:
谢谢,纳瓦兹
我想用相邻列中的值替换一列中的空值,例如,如果我 我希望它是: 尝试过 但没有工作,它说值应该是浮点数、int、长、字符串或判决 有什么想法吗?