当前位置: 首页 > 知识库问答 >
问题:

Pyspark数据框使用UDF加入

荣晨朗
2023-03-14

我正在尝试在PySpark中为两个数据框(df1和df2)创建自定义连接(类似于此),代码如下所示:

my_join_udf = udf(lambda x, y: isJoin(x, y), BooleanType())
my_join_df = df1.join(df2, my_join_udf(df1.col_a, df2.col_b))

我得到的错误消息是:

java.lang.RuntimeException: Invalid PythonUDF PythonUDF#<lambda>(col_a#17,col_b#0), requires attributes from more than one child

有没有办法编写一个可以处理来自两个单独数据帧的列的 PySpark UDF?

共有1个答案

后凯捷
2023-03-14

火花2.2

您必须使用交叉连接或在配置中启用交叉连接:

df1.crossJoin(df2).where(my_join_udf(df1.col_a, df2.col_b))

火花2.0、2.1

下面显示的方法在Spark 2. x中不再起作用。请参见SPARK-19728。

Spark 1.x

理论上你可以加入和过滤:

df1.join(df2).where(my_join_udf(df1.col_a, df2.col_b))

但总的来说,你不应该全部。任何类型的基于相等的连接都需要一个完整的笛卡尔积(与答案相同),这很少被接受(另请参阅为什么在SQL查询中使用UDF会导致笛卡尔积?)。

 类似资料:
  • 我是pyspark的新手,我来尝试做一些像下面这样的事情,为每个cookie调用一个函数Print细节,然后将结果写入文件。spark.sql查询返回正确的数据,我也可以将其序列化为文件。有人可以帮助每个cookie上的for语句。调用UDF的语法应该是什么,如何将输出写入文本文件? 任何帮助是值得赞赏的。谢谢

  • 有没有办法选择整行作为一列输入到Pyspark过滤器udf中? 我有一个复杂的过滤函数“my_filter”,我想应用于整个数据帧: 但是 引发错误,因为这不是有效的操作。 我知道我可以将数据帧转换为RDD,然后使用RDD的过滤方法,但我不想将其转换为RDD,然后再转换回数据帧。我的数据帧具有复杂的嵌套类型,因此当我尝试再次将 RDD 转换为数据帧时,架构推断将失败。

  • 我目前有一个pyspark数据帧,其中一列包含一些数字行,我想使用我编写的函数来查找这些数字行,以返回一个信息字符串。我知道简单的方法是使用withCoulmn并定义一个UDF来从旧列创建一个新列,但是我的函数的某些方式使它不能注册为UDF。我可以根据旧列的值用新列创建一个新的数据框架,而不创建UDF吗?

  • 问题内容: 我正在尝试过滤具有作为行值的PySpark数据框: 我可以使用字符串值正确过滤: 但这失败了: 但是每个类别上肯定都有价值。这是怎么回事? 问题答案: 您可以使用/ : 如果你想简单地丢弃值,您可以使用与参数: 基于等式的比较将无法正常工作,因为在SQL中未定义,因此任何将其与另一个值进行比较的尝试都将返回: 与值进行比较的唯一有效方法是/ ,它等效于/方法调用。

  • 问题内容: 我有一个看起来像这样: 两列都是String类型(StringType()),我想将其放入spark ml randomForest中。为此,我需要将要素列转换为包含浮点数的向量。有谁知道怎么做吗? 问题答案: 如果您使用的是 Spark 2.x ,我相信这就是您所需要的: 使用 Spark 1.6 并没有太大不同: 具有可以帮助您实现所要完成的功能的功能。

  • 问题内容: 我正在使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧: 现在df1有26,572,528条记录。因此,我期望idx值为0-26,572,527。 但是当我选择max(idx)时,它的值非常大:335,008,054,165。 这个功能是怎么回事?使用此功能与具有相似记录数量的另一个数据集合并是否可靠? 我有大约300个数据框,