在闭包外调用函数时出现奇怪行为:
任务不可序列化:java。伊奥。NotSerializableException:测试
问题是我需要我的代码在一个类中,而不是一个对象。知道为什么会发生这种情况吗?Scala对象是序列化的吗?)?
这是一个工作代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
这是一个不起作用的例子:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
完整的谈话充分解释了这个问题,它提出了一个伟大的范式转换方式来避免这些序列化问题:https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
得票最多的答案基本上是建议放弃整个语言特性——不再使用方法,而只使用函数。的确,在函数式编程中,应该避免类中的方法,但将它们转化为函数并不能解决这里的设计问题(参见上面的链接)。
作为这种特殊情况下的快速修复方法,您可以使用@transient
注释告诉它不要尝试序列化有问题的值(这里,Spark.ctx
是一个自定义类,而不是Spark在OP的命名之后的类):
@transient
val rddList = Spark.ctx.parallelize(list)
您还可以重新构造代码,使rddList位于其他地方,但这也很糟糕。
在未来的Scala将包括这些被称为“孢子”的东西,这将使我们能够精确地控制关闭时会和不会被拉进来的东西。此外,这应该将所有意外引入不可序列化类型(或任何不需要的值)的错误变成编译错误,而不是现在,这是可怕的运行时异常/内存泄漏。
http://docs.scala-lang.org/sips/pending/spores.html
当使用kyro,使其注册是必要的,这将意味着你得到的错误,而不是内存泄漏:
“最后,我知道kryo有kryo.setRegistrationOptional(true),但我很难想出如何使用它。当这个选项打开时,如果我没有注册类,kryo似乎仍然会抛出异常。”
向kryo注册课程的策略
当然,这只给您类型级控制,而不是值级控制。
... 还有更多想法。
Grega的回答很好地解释了为什么原始代码不起作用,以及解决问题的两种方法。然而,这种解决方案不是很灵活;考虑一个情况,其中您的闭包包含一个方法调用,在非代码>可序列化的类中,您无法控制。您既不能将Serializable
标记添加到此类,也不能更改底层实现以将方法更改为函数。
Nilesh为此提供了一个很好的解决方案,但解决方案可以更加简洁和通用:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
然后可以使用此函数序列化程序自动包装闭包和方法调用:
rdd map genMapper(someFunc)
这种技术的另一个好处是,访问KryoSerializationWrapper
不需要额外的Shark依赖项,因为Twitter的寒意已经被core Spark所吸引
RDD扩展了可序列化接口,因此这并不是导致任务失败的原因。这并不意味着可以使用Spark序列化RDD
,避免NotSerializableException
Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(RDD),它可以被视为一个分布式集合。基本上,RDD的元素在集群的节点上分区,但是Spark将其抽象为远离用户,让用户与RDD(集合)交互,就像它是本地的一样。
不想涉及太多细节,但当您在RDD上运行不同的转换时(map
,flatMap
,filter
等),您的转换代码(闭包)是:
当然,您可以在本地运行(如您的示例中所示),但所有这些阶段(除了通过网络传送)仍然会发生。[这可以让您在部署到生产环境之前就发现任何漏洞]
在第二种情况下,您将调用一个方法,该方法是在map函数内部的类测试
中定义的。Spark看到了这一点,因为方法本身无法序列化,Spark试图序列化整个测试
类,这样代码在另一个JVM中执行时仍然有效。你有两种可能:
要么让类测试可序列化,这样整个类就可以通过Spark序列化:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
或者使用someFunc
函数代替方法(函数是Scala中的对象),这样Spark就可以序列化它:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
类似但不同的类序列化问题可能会引起您的兴趣,您可以在Spark Summit 2013演示中阅读。
顺便说一下,您可以将rddList.map(某些Func(_))
重写为rddList.map(某些Func)
,它们是完全相同的。通常,第二种是首选的,因为它不那么冗长,阅读起来更干净。
编辑(2015-03-15):SPARK-5307引入了SerializationDebugger,SPARK 1.3.0是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到NotSerializableException时,调试器会访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象。
在OP的例子中,这是打印到stdout的内容:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
问题内容: 问题: 假设我的映射器可以是内部调用其他类并创建对象并在内部执行其他操作的函数(def)。(或者它们甚至可以是扩展(Foo)=> Bar并在其apply方法中进行处理的类-但现在让我们忽略这种情况) Spark仅支持Java序列化闭包。有什么办法吗?我们可以用某些东西代替闭包来做我想做的事吗?我们可以使用Hadoop轻松完成此类工作。这件事使Spark对我几乎无法使用。不能期望所有第三
看看这个问题:Scala Spark-任务不可序列化:java.io.NotSerializableExceptionon。当调用函数外部闭包只对类不是对象。 问题: 假设我的映射器可以是函数(def),在内部调用其他类,创建对象,并在内部执行不同的操作。(或者它们甚至可以是扩展的类(Foo)= Spark仅支持闭包的Java序列化。有办法解决这个问题吗?我们可以用某物代替闭包来做我想做的事情吗?
我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:
问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是
问题内容: 我正在尝试在此准备好的语句中绑定变量,但我一直收到错误: 调用该函数,并将变量传递给它。当我更改函数以仅回显变量时,该变量会在页面上正常打印,但是如果我尝试在此处绑定它,则会收到错误。有人可以帮忙吗? 我知道函数没有完全写在这里,但这不应该是一个问题。我不明白为什么我会收到此错误。 问题答案: 正如错误消息所说,这似乎不是一个对象。尝试在您的prepare- call之后使用来调试它。
问题内容: 我正在将CakePHP数组的返回值转换为JSON,目前它是这样的: 我希望它像这样: 我正在尝试 设置:: extract(’{n} .Model’,$ data) Hash :: extract(’{n} .Model’,$ data)根本没有运气。 完整代码: 问题答案: 为选项设置,而不是。An 表示可能需要序列化多个视图变量,这要求将它们打包到单独的对象属性中。 那应该给您期望