我必须附加这个由“stroint”方法生成的列,它被证明是不可序列化的。
def strToInt(colVal : String) : Int = {
var str = new Array[String](3)
str(0) = "icmp"; str(1) = "tcp"; str(2) = "udp"
var i = 0
for (i <- 0 to str.length-1) {
if (str(i) == colVal) { return i }
}
throw new IllegalStateException("This never happens")
}
val strtoint = udf(strToInt(_:String)).apply(col("Atr 1"))
val newDF = df.withColumn("newCol", strtoint)
我已经尝试过这样把函数放在辅助类中,
object Helper extends Serializable {
def strToInt ...
}
但这没用。
这个问题似乎与我在Java中遇到的问题类似。我的udf函数使用密码库对某些内容进行加密,引发的异常是:
原因:java。伊奥。NotSerializableException:javax。加密。密码序列化堆栈:-对象不可序列化(类:javax.crypto.Cipher,值:javax.crypto)。Cipher@625d02ce)
我不能添加'实现序列化'到Cipher类,因为它是由Java提供的库。
我在这个链接中使用了以下解决方案:spark如何在java中通过dataset调用udf
private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};
注册UDF,就可以使用callUDF函数了。
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
sqlContext.udf().register("toUpper", toUpper, DataTypes.StringType);
peopleDF.select(col("name"),callUDF("toUpper", col("name"))).show();
在这里,我没有调用str.toUpperCase();而是调用了我的Cipher实例。
理解这里发生的事情的关键是,尽管Scala是一种函数式编程语言,但它运行在不支持函数类型的JVM上。在运行时,任何分配了“anonymous”或“lambda”函数的val
实际上都是带有apply
方法的匿名类的实例。假设你有以下几点:
object helper {
val isNegative: (Int => Boolean) = (n: Int) => n < 0
}
这编译成与此相同的东西:
object helper {
val isNegative: Function1[Int, Boolean] = {
def apply(n: Int): Boolean = n < 0
}
}
isNegative
实际上是一个匿名类实例,扩展了trait函数1
。当你这样做的时候:
object helper {
def isNegative(n: Int): Boolean = n < 0
}
现在is负值
是对象helper
的方法。当涉及到处理Spark时,如果你要做这样的事情:
// ds is a Dataset[Int]
ds.filter(isNegative)
在第一种情况下,Spark必须序列化分配给isGregative
的匿名类并失败,因为它是不可序列化的。在第二种情况下,它必须序列化helper
,这是有效的,因为如果对象的所有状态都是可序列化的,则它是可序列化的。
要将此应用于您的问题,请执行以下操作:
val strtoint = udf(strToInt(_:String)).apply(col("Atr 1"))
在运行时,
strtoint
是一个匿名类实例,其特征是Funtion1[String, User定义函数]
,这是一个方法,当它是一个调用时,它会生成一个用户定义函数。填充下划线后,它与此相同:
val strtoInt: Function1[String, UserDefinedFunction] = new Function1[String, UserDefinedFunction] = {
def apply(t1: String) = udf(strToInt(t1 :String)).apply(col("Atr 1"))
}
要最小限度地更改代码,只需将
val
更改为def
:
def sti = udf(strToInt(_:String)).apply(col("Atr 1"))
现在,
sti
是它的封闭类的一个成员函数,如果它是可序列化的,那么就Spark而言应该是好的。这里需要记住的另一件事是,stroint
也需要是可序列化的类
或对象
另一种解决方法是将
val stroint
更改为UserDefinedFunction
,这是一个case类
,因此是可序列化的,但是您仍然需要确保stroint
是可序列化的类
或对象
的成员。
当函数执行在withColumn
级别时(定义UDF时除外),将代码更改为如下所示。
// define a UDF
val strtoint = udf(strToInt _)
// use it (aka execute)
val newDF = df.withColumn("newCol", strtoint(col("Atr 1")))
这些看似微小的改变改变了你创造的东西,以及你后来如何执行它。
正如您可能已经注意到的,udf创建了Spark SQL能够理解(可以执行)的用户定义函数:
udf[RT,A1](f:(A1)⇒ 定义为用户定义的函数(用户定义的UDF-1):定义为用户定义的函数。
(为了便于理解,我删除了隐含的参数)
引用用户定义函数的caladoc:
用户定义的函数。要创建一个,请使用函数中的udf
函数。
我不太同意,但“协议”是先注册一个UDF,然后才能在查询中执行它,比如withColumn
或select
操作符。
我还会将strToInt
更改为更加Scala惯用(希望也更容易理解)。
def strToInt(colVal : String) : Int = {
val strs = Array("icmp", "tcp", "udp")
strs.indexOf(colVal)
}
我创建了一个名为的函数,该函数采用了如图所示的这3个参数,结果是新参数。我想将此函数应用于一个数据帧,其中函数参数是数据帧中的某些列,并希望将函数的输出参数添加为数据帧中的新参数,在数据帧中为每行计算函数。
我正在处理一个大型数据集,其中大部分数据被输入两次。这意味着许多变量由成对的列表示:,其中数据由一个人输入;,其中相同的数据由另一个人输入。我想创建一个名为的“主”列,该列首先从提取,然后如果是,则从提取。 下面是我试图用虚构数据做的一个示例: 例如,下面的代码显示了我希望对单个列对执行的操作的示例。 但是,我希望自动化这个过程,而不是手动完成每个过程。下面是我尝试使用要为其创建新的“主”列的列对
以下代码出现“任务不可序列化”错误? 错误 代码: 更新: 我将调用更改为以下内容, 现在我得到了错误的答案
我有以下函数(一个以列作为输入的热编码函数)。我基本上想把它应用到我的数据框中的一列,但似乎不明白出了什么问题。 猜我怎么称呼它有问题?
我从这个URL刮取了这个表: "https://www.patriotsoftware.com/blog/accounting/average-cost-living-by-state/" 看起来像这样: 然后我编写了这个函数来帮助我将字符串转换成整数: 当我只将函数应用于一列时,它就会工作。我在这里找到了关于在多个列上使用的答案:如何将函数应用于多个列 但我下面的代码不起作用,也不会产生错误:
我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题: 如果我运行以下Scala/Spark代码,它将正常运行: 但是,在声明了此处建议的自定义数据帧类型之后 使用它的例子如下: 这次运行成功。 现在如果我再次运行下面的代码(同上) 我收到了错误信息: rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize