当前位置: 首页 > 知识库问答 >
问题:

如何将用户定义的函数应用于列(添加列时给出“任务不可序列化”)?

秦鸿羽
2023-03-14

我必须附加这个由“stroint”方法生成的列,它被证明是不可序列化的。

def strToInt(colVal : String) : Int = {
  var str = new Array[String](3)
  str(0) = "icmp"; str(1) = "tcp"; str(2) = "udp"
  var i = 0
  for (i <- 0 to str.length-1) {
    if (str(i) == colVal) { return i }
  }
  throw new IllegalStateException("This never happens")
}
val strtoint = udf(strToInt(_:String)).apply(col("Atr 1"))
val newDF = df.withColumn("newCol", strtoint)

我已经尝试过这样把函数放在辅助类中,

object Helper extends Serializable {
    def strToInt ...     
                                    }

但这没用。

共有3个答案

高山
2023-03-14

这个问题似乎与我在Java中遇到的问题类似。我的udf函数使用密码库对某些内容进行加密,引发的异常是:

原因:java。伊奥。NotSerializableException:javax。加密。密码序列化堆栈:-对象不可序列化(类:javax.crypto.Cipher,值:javax.crypto)。Cipher@625d02ce)

我不能添加'实现序列化'到Cipher类,因为它是由Java提供的库。

我在这个链接中使用了以下解决方案:spark如何在java中通过dataset调用udf

private static UDF1 toUpper = new UDF1<String, String>() {
    public String call(final String str) throws Exception {
        return str.toUpperCase();
    }
};

注册UDF,就可以使用callUDF函数了。

import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;

sqlContext.udf().register("toUpper", toUpper, DataTypes.StringType);
peopleDF.select(col("name"),callUDF("toUpper", col("name"))).show();

在这里,我没有调用str.toUpperCase();而是调用了我的Cipher实例。

甄成弘
2023-03-14

理解这里发生的事情的关键是,尽管Scala是一种函数式编程语言,但它运行在不支持函数类型的JVM上。在运行时,任何分配了“anonymous”或“lambda”函数的val实际上都是带有apply方法的匿名类的实例。假设你有以下几点:

object helper {
  val isNegative: (Int => Boolean) = (n: Int) => n < 0
}

这编译成与此相同的东西:

object helper {
  val isNegative: Function1[Int, Boolean] = {
    def apply(n: Int): Boolean = n < 0
  }
}

isNegative实际上是一个匿名类实例,扩展了trait函数1。当你这样做的时候:

object helper {
  def isNegative(n: Int): Boolean = n < 0
}

现在is负值是对象helper的方法。当涉及到处理Spark时,如果你要做这样的事情:

// ds is a Dataset[Int]
ds.filter(isNegative)

在第一种情况下,Spark必须序列化分配给isGregative的匿名类并失败,因为它是不可序列化的。在第二种情况下,它必须序列化helper,这是有效的,因为如果对象的所有状态都是可序列化的,则它是可序列化的。

要将此应用于您的问题,请执行以下操作:

val strtoint = udf(strToInt(_:String)).apply(col("Atr 1"))

在运行时,strtoint是一个匿名类实例,其特征是Funtion1[String, User定义函数],这是一个方法,当它是一个调用时,它会生成一个用户定义函数。填充下划线后,它与此相同:

val strtoInt: Function1[String, UserDefinedFunction] = new Function1[String, UserDefinedFunction] = {
  def apply(t1: String) = udf(strToInt(t1 :String)).apply(col("Atr 1"))
}

要最小限度地更改代码,只需将val更改为def

def sti = udf(strToInt(_:String)).apply(col("Atr 1"))

现在,sti是它的封闭类的一个成员函数,如果它是可序列化的,那么就Spark而言应该是好的。这里需要记住的另一件事是,stroint也需要是可序列化的对象

另一种解决方法是将val stroint更改为UserDefinedFunction,这是一个case类,因此是可序列化的,但是您仍然需要确保stroint是可序列化的对象的成员。

祁彬
2023-03-14

当函数执行在withColumn级别时(定义UDF时除外),将代码更改为如下所示。

// define a UDF
val strtoint = udf(strToInt _)
// use it (aka execute)
val newDF = df.withColumn("newCol", strtoint(col("Atr 1")))

这些看似微小的改变改变了你创造的东西,以及你后来如何执行它。

正如您可能已经注意到的,udf创建了Spark SQL能够理解(可以执行)的用户定义函数:

udf[RT,A1](f:(A1)⇒ 定义为用户定义的函数(用户定义的UDF-1):定义为用户定义的函数。

(为了便于理解,我删除了隐含的参数)

引用用户定义函数的caladoc:

用户定义的函数。要创建一个,请使用函数中的udf函数。

我不太同意,但“协议”是先注册一个UDF,然后才能在查询中执行它,比如withColumnselect操作符。

我还会将strToInt更改为更加Scala惯用(希望也更容易理解)。

def strToInt(colVal : String) : Int = {
  val strs = Array("icmp", "tcp", "udp")
  strs.indexOf(colVal)
}
 类似资料:
  • 我创建了一个名为的函数,该函数采用了如图所示的这3个参数,结果是新参数。我想将此函数应用于一个数据帧,其中函数参数是数据帧中的某些列,并希望将函数的输出参数添加为数据帧中的新参数,在数据帧中为每行计算函数。

  • 我正在处理一个大型数据集,其中大部分数据被输入两次。这意味着许多变量由成对的列表示:,其中数据由一个人输入;,其中相同的数据由另一个人输入。我想创建一个名为的“主”列,该列首先从提取,然后如果是,则从提取。 下面是我试图用虚构数据做的一个示例: 例如,下面的代码显示了我希望对单个列对执行的操作的示例。 但是,我希望自动化这个过程,而不是手动完成每个过程。下面是我尝试使用要为其创建新的“主”列的列对

  • 以下代码出现“任务不可序列化”错误? 错误 代码: 更新: 我将调用更改为以下内容, 现在我得到了错误的答案

  • 我有以下函数(一个以列作为输入的热编码函数)。我基本上想把它应用到我的数据框中的一列,但似乎不明白出了什么问题。 猜我怎么称呼它有问题?

  • 我从这个URL刮取了这个表: "https://www.patriotsoftware.com/blog/accounting/average-cost-living-by-state/" 看起来像这样: 然后我编写了这个函数来帮助我将字符串转换成整数: 当我只将函数应用于一列时,它就会工作。我在这里找到了关于在多个列上使用的答案:如何将函数应用于多个列 但我下面的代码不起作用,也不会产生错误:

  • 我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题: 如果我运行以下Scala/Spark代码,它将正常运行: 但是,在声明了此处建议的自定义数据帧类型之后 使用它的例子如下: 这次运行成功。 现在如果我再次运行下面的代码(同上) 我收到了错误信息: rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize