当前位置: 首页 > 知识库问答 >
问题:

PySpark中DataFrame的逐行操作

刘野
2023-03-14

如果有一个数据帧,并希望根据行的值对函数中的数据进行一些操作。

my_udf(row):
    threshold = 10
        if row.val_x > threshold
        row.val_x = another_function(row.val_x)
        row.val_y = another_function(row.val_y)
        return row
    else:
        return row

有谁知道如何将我的udf应用于数据帧?

共有2个答案

瞿博学
2023-03-14

如果您可以使用pyspark函数,最好不要使用UDF,如果您不能将<code>另一个_function

from pyspark.sql.types import *
import pyspark.sql.functions as psf

def another_function(val):
    ...

another_function_udf = psf.udf(another_function, [outputType()])

其中< code>outputType()是与< code > other _ function (< code > integer type(),< code>StringType()的输出相对应的pyspark类型...)

def apply_another_function(val):
    return psf.when(df.val_x > threshold, another_function_udf(val)).otherwise(val)

df = df.withColumn('val_y', apply_another_function(df.val_y))\
       .withColumn('val_x', apply_another_function(df.val_x))
赵骏奇
2023-03-14

根据我的理解,udf参数是列名。您的示例可能会重写为:

from pyspark.sql.functions import udf, array
from pyspark.sql.types import IntegerType

def change_val_x(val_x):
    threshold = 10
    if val_x > threshold:
        return another_function(val_x)
    else:
        return val_x

def change_val_y(arr):
    threshold = 10
    # arr[0] -> val_x, arr[0] -> val_y 
    if arr[0] > threshold:
        return another_function(arr[1])
    else:
        return val_y

change_val_x_udf = udf(change_val_x, IntegerType())
change_val_y_udf = udf(change_val_y, IntegerType())

# apply these functions to your dataframe
df = df.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y')))\
       .withColumn('val_x', change_val_x_udf('val_x'))

要修改val_x列,一个简单的 udf 就足够了,但对于需要val_y和val_x列值val_y,解决方案是使用数组。请注意,此代码未经测试...

参见此问题对多列应用udf。

 类似资料:
  • 运行时得到错误meesagge 你知道什么变通办法或解决办法吗? 注意:传递值第一行匹配,在本例中匹配到所有行。我想逐行传递rlike值。像 null 预期成果:

  • 与数据帧相比,我对使用< code>data.table的< code>tapply类操作的速度提升印象极为深刻。 例如: 然而,我并没有完全设法让它比类操作中的数据帧工作得更快(即,需要将函数应用于每一行的情况)。 这真的只是意料之中还是有一些技巧?

  • 还有其他关于如何重命名PySpark DataFrame中的列的线程,请参见这里、这里和这里。我不认为现有的解决方案具有足够的性能或通用性(我有一个应该更好的解决方案,但我被一个边缘情况bug所困扰)。让我们从回顾当前解决方案中的问题开始: 重复调用可能会遇到与多次调用相同的性能问题,如本博客文章所述。请参见此答案中的选项2。 toDF方法依赖于模式推断,不一定保留列的nullable属性(在生产

  • 我目前有一个数据表,其中一列类型为“a b c d e...”。将此列称为“COL4” 我想通过拆分col4的元素来将单行拆分为多行,同时保留所有其他列的值。 COL1[0]COL2[0]COL3[0]a b c 我希望输出为: COL1[0]COL2[0]COL3[0]a COL1[0]COL2[0]COL3[0]a b c 这不是我想要的。

  • 有两个,我想删除一个。我该怎么办?

  • 我正在尝试使用以下代码将数据帧“df2”保存到文本文件中 代码:df2。写格式(“文本”)。模式(“覆盖”)。保存(“/tmp/hive/save\u text”) 错误: Py4JJavaError Traceback(最近一次调用) /databricks/spark/python/pyspark/sql/utils.py在deco(*a,**kw)62 try:--- /databricks