当前位置: 首页 > 知识库问答 >
问题:

Spark"包. TreeNodeException"错误python"java.lang.RuntimeException:找不到pythonUDF"

林建本
2023-03-14

我在数据库上使用pySpark 2.1。

我编写了一个UDF来为pyspark数据帧的每一行生成唯一的uuid。我使用的数据帧相对较小

我知道有内置函数spark functions和zipWithinIndex()生成行索引,但我被特别要求在这个特定项目中使用uuid。

UDF\u insert\u uuid在小数据集上运行良好,但似乎与内置的spark函数subtract冲突。

导致此错误的原因:

包裹TreeNodeException:绑定属性,树:pythonUDF0#104830

在驱动程序堆栈错误的深处,它还说:

原因:java。lang.RuntimeException:找不到pythonUDF0#104830

下面是我正在运行的代码:

import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *

import uuid

#define a python function
def insert_uuid():
  user_created_uuid = str( uuid.uuid1() )
  return user_created_uuid

#register the python function for use in dataframes
udf_insert_uuid = udf(insert_uuid, StringType())
import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *

list_of_numbers = range(1000,1050)

temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)

sparkDF = (
  spark
  .createDataFrame(temp_pandasDF, ["data_points"])
  .withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b"))    #if "values" < 25, then "labels" = "a", else "labels" = "b"
  .repartition("labels")
)

sparkDF.createOrReplaceTempView("temp_spark_table")

#add a unique id for each row
#udf works fine in the line of code here
sparkDF = sparkDF.withColumn("id", lit( udf_insert_uuid() ))

sparkDF.show(20, False)
+-----------+------+------------------------------------+
|data_points|labels|id |
+-----------+------+------------------------------------+ 
|1029 |b |d3bb91e0-9cc8-11e7-9b70-00163e9986ba|
|1030 |b |d3bb95e6-9cc8-11e7-9b70-00163e9986ba|
|1035 |b |d3bb982a-9cc8-11e7-9b70-00163e9986ba|
|1036 |b |d3bb9a50-9cc8-11e7-9b70-00163e9986ba|
|1042 |b |d3bb9c6c-9cc8-11e7-9b70-00163e9986ba|
+-----------+------+------------------------------------+
only showing top 5 rows
list_of_numbers = range(1025,1075)

temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)

new_DF = (
  spark
  .createDataFrame(temp_pandasDF, ["data_points"])
  .withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b"))    #if "values" < 25, then "labels" = "a", else "labels" = "b"
  .repartition("labels"))

new_DF.show(5, False)
+-----------+------+
|data_points|labels|
+-----------+------+
|1029 |b |
|1030 |b |
|1035 |b |
|1036 |b |
|1042 |b | 
+-----------+------+
only showing top 5 rows
values_not_in_new_DF = (new_DF.subtract(sparkDF.drop("id")))
display(values_not_in_new_DF
       .withColumn("id", lit( udf_insert_uuid()))   #add a column of unique uuid's
       )

共有1个答案

禹智渊
2023-03-14

运行脚本时,我遇到了与您相同的错误。我发现使其工作的唯一方法是传递UDF的列,而不是不传递参数:

def insert_uuid(col):
    user_created_uuid = str( uuid.uuid1() )
    return user_created_uuid
udf_insert_uuid = udf(insert_uuid, StringType())

然后在标签上调用它,例如:

values_not_in_new_DF\
    .withColumn("id", udf_insert_uuid("labels"))\
    .show()

无需使用照明

 类似资料:
  • Lenovo@Lenovo-ThinkCentre-mini:~/documents/apache-jmeter-4.0/bin$jmeter-n-t test.jmx-l my-test-plan-result.jtl-j my-test-plan-result.log-dthreads=500-dramp_up=60-dduration=300 NonGUIDriver java.lang.r

  • 问题内容: 我试图安装Python软件包: 但是我收到了一个神秘的错误消息: 如果我尝试手动安装软件包,也会发生相同的情况: 问题答案: 对于Windows安装: 在运行进行软件包安装时,Python 2.7搜索已安装的Visual Studio2008。你可以通过在调用之前在环境变量中设置正确的路径来欺骗Python使用更新的。 根据安装的Visual Studio版本执行以下命令: Visua

  • 我有一个ASP.NET核心docker映像。我最后一次尝试建造它是两个月前。现在,我得到了一个错误建筑。 有什么想法吗?Microsoft docker映像上有什么坏了吗?在试图发布和运行一个Elasticbeanstalk实例时,这也是一个问题。 DockerFile

  • 我对Spark和Python是新手。我已经在Windows上安装了python 3.5.1和Spark-1.6.0-bin-Hadoop2.4。 当我从python shell执行sc=SparkContext(“local”,“simple app”)时,我得到了以下错误。 文件“”,第1行,在 文件“C:\spark-1.6.0-bin-hadoop2.4\python\pyspark\con

  • 问题内容: 我添加了在“ docker-compose”期间安装软件包。但是,当我运行时发生了以下错误。我发现程序包保存在中。 运行docker-compose并进行构建 docker-compose.yml Docker文件 main.go 更新1 我注意到以下目录之间的巨大差异。 更新2 正如@aerokite所说,“卷”正在覆盖下载的软件包。我像以下内容进行了更改,并且有效。 Docker文

  • 在这里输入图像描述我正在尝试运行一个ejs文件,并得到错误:无法找到包含文件“partials/head”。 我已经检查了stackoverflow和github的大部分文章,但是无法解决它... 错误:在getIncludePath(C:\users\Junia\Desktop\Node\Node_Desktop\EJS\lib\EJS.js:162:13)中找不到包含文件“partials/h