当前位置: 首页 > 知识库问答 >
问题:

从UDF返回StructType的ArrayType时出错(并在多个UDF中使用单个函数)

唐钊
2023-03-14

(编辑)更改了字段名称(从foo、bar……改为name和city),因为旧的名称令人困惑

我需要在多个UDF中使用单个函数,并根据输入返回不同的结构。

我的实现的这个简化版本基本上完成了我正在寻找的内容:

from pyspark.sql.types import IntegerType, StructType, StringType
from pyspark.sql.functions import when, col

df = spark.createDataFrame([1, 2, 3], IntegerType()).toDF('id')

struct_one = StructType().add('name', StringType(), True)
struct_not_one = StructType().add('city', StringType(), True)

def select(id):
  if id == 1:
    return {'name': 'Alice'}
  else:
    return {'city': 'Seattle'}

one_udf = udf(select, struct_one)
not_one_udf = udf(select, struct_not_one)

df = df.withColumn('one', when((col('id') == 1), one_udf(col('id'))))\
       .withColumn('not_one', when((col('id') != 1), not_one_udf(col('id'))))

display(df)   

(编辑)输出:

id  one               not_one
1   {"name":"Alice"}  null
2   null              {"city":"Seattle"}
3   null              {"city":"Seattle"}

但是,返回StructType的ArrayType的代码不幸失败:

from pyspark.sql.types import IntegerType, StructType, StringType, ArrayType
from pyspark.sql.functions import when, col

df = spark.createDataFrame([1, 2, 3], IntegerType()).toDF('id')

struct_one = StructType().add('name', StringType(), True)
struct_not_one = ArrayType(StructType().add('city', StringType(), True))

def select(id):
  if id == 1:
    return {'name': 'Alice'}
  else:
    return [{'city': 'Seattle'}, {'city': 'Milan'}]

one_udf = udf(select, struct_one)
not_one_udf = udf(select, struct_not_one)

df = df.withColumn('one', when((col('id') == 1), one_udf(col('id'))))\
       .withColumn('not_one', when((col('id') != 1), not_one_udf(col('id'))))

display(df)      

错误消息是:

ValueError:结构类型的意外元组“名称”

(编辑)所需输出为:

id  one                 not_one
1   {"name":"Alice"}    null
2   null                [{"city":"Seattle"},{"city":"Milan"}]
3   null                [{"city":"Seattle"},{"city":"Milan"}]

但是,其他类型(StringType、ENTgerType、...)的返回和ArrayType可以使用。

在多个UDF中不使用单个函数时,也可以返回StructType数组:

from pyspark.sql.types import IntegerType, StructType, StringType, ArrayType
from pyspark.sql.functions import when, col

df = spark.createDataFrame([1, 2, 3], IntegerType()).toDF('id')

struct_not_one = ArrayType(StructType().add('city', StringType(), True))

def select(id):
    return [{'city': 'Seattle'}, {'city': 'Milan'}]

not_one_udf = udf(select, struct_not_one)

df = df.withColumn('not_one', when((col('id') != 1), not_one_udf(col('id'))))

display(df)   

(编辑)输出:

id  not_one
1   null
2   [{"city":"Seattle"},{"city":"Milan"}]
3   [{"city":"Seattle"},{"city":"Milan"}]

为什么返回结构类型的数组类型并使用多个UDF和一个函数不起作用?

谢谢

共有1个答案

皇甫智明
2023-03-14

“Spark SQL(包括SQL以及DataFrame和Dataset API)不保证子表达式的求值顺序。。。因此,依赖布尔表达式的副作用或求值顺序以及WHERE和HAVING子句的顺序是危险的,因为这些表达式和子句可以在查询优化和规划期间重新排序。具体来说,如果UDF依赖SQL中的短路语义进行空检查,则不能保证在调用UDF之前会进行空检查。"

请参阅评估顺序和空值检查

为了保持自定义项的通用性,您可以将“when filter”推入自定义项中:

from pyspark.sql.types import IntegerType, StructType, StringType, ArrayType
from pyspark.sql.functions import when, col, lit

df = spark.createDataFrame([1, 2, 3], IntegerType()).toDF('id')

struct_one = StructType().add('name', StringType(), True)
struct_not_one = ArrayType(StructType().add('city', StringType(), True))

def select(id, test):

  if eval(test.format(id)) is False:
    return None

  if id == 1:
    return {'name': 'Alice'}
  else:
    return [{'city': 'Seattle'}, {'city': 'Milan'}]

one_udf = udf(select, struct_one)
not_one_udf = udf(select, struct_not_one)

df = df.withColumn('one', one_udf(col('id'), lit('{} == 1')))\
       .withColumn('not_one', not_one_udf(col('id'), lit('{} != 1')))

display(df)    
 类似资料:
  • 火花UDF是否可能返回多个值?如果是这样,如何在数据框架API中访问各个项目。

  • 如何创建一个带一个字符串返回多个字符串的UDF?到目前为止,我所看到的UDF只能给出一个输出。如何从一个UDF中获得多个FEILD作为输出? 最简单的方法是实现name->FirstName,lastname。不是寻找拆分名称的替代解决方案,而是寻找有助于实现此类需求的API/UDF。 假设nameSplitteris是我的UDF

  • 我有一个具有以下模式的数据帧: 我想使用一个UDF,它将user_loans_arr和new_loan作为输入,并将new_loan结构添加到现有的user_loans_arr中。然后,从user_loans_arr中删除loan_date超过12个月的所有元素。 提前谢谢。

  • 我有一个PySpark Dataframe,它有两列(,,其类型为),其值为或。我正在尝试添加一个新列,这是这两个列的总和。我遵循Pyspark中的示例:在UDF中传递多列 这显示了一系列的< code>NULL,而不是我期望的结果。 我尝试了以下任何一种方法,以查看数据类型是否存在问题 仍然得到空值。 我试着移除阵列: 这可以正常工作并显示 我试着移除UDF,但是离开了阵列: 这可以正常工作并显

  • 我使用的是sparkSql 1.6.2(Java API),我必须处理下面的DataFrame,其中包含两列中的值列表: 所需的表为: 我想我必须使用爆炸函数和自定义UDF函数的组合。 null register(“combineUDF”,combineUDF,retSchema); 任何帮助都将非常感谢。 更新:我试图首先实现zip(AttributeName,AttributeValue),所

  • 失败:执行错误,从org.apache.hadoop.hive.ql.exec.FunctionTask返回代码1。配置单元仓库是非本地的,但是/home/hduser/hadoop-tutorial/hadoop-book-master/ch17-hive/src/main/java/com/hadoopbook/Hive/strip.jar指定本地文件系统中的文件。非本地仓库上的资源应指定非本