当前位置: 首页 > 知识库问答 >
问题:

Spark-任务不可序列化:如何处理调用外部类/对象的复杂映射闭包?

司空朝
2023-03-14

看看这个问题:Scala Spark-任务不可序列化:java.io.NotSerializableExceptionon。当调用函数外部闭包只对类不是对象。

问题:

假设我的映射器可以是函数(def),在内部调用其他类,创建对象,并在内部执行不同的操作。(或者它们甚至可以是扩展的类(Foo)=

Spark仅支持闭包的Java序列化。有办法解决这个问题吗?我们可以用某物代替闭包来做我想做的事情吗?我们可以很容易地用Hadoop做这类事情。这件事让火花对我来说几乎无法使用。我们不能期望所有的第三方库都具有可扩展的所有类。

可能的解决方案

像这样的东西有什么用处吗:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

当然,答案似乎是包装纸,但我看不出确切的答案。

共有2个答案

后安民
2023-03-14

在使用JavaAPI的情况下,当传递到映射函数闭包时,应该避免匿名类。您需要一个类来扩展您的函数并将其传递给map(..),而不是执行map(新函数)见:https://yanago.wordpress.com/2015/03/21/apache-spark/

滑乐逸
2023-03-14

我自己想出了办法!

您只需在通过闭包之前序列化对象,然后反序列化。即使你的类不是可序列化的,这种方法也是有效的,因为它在幕后使用了Kryo。你只需要一些咖喱。;)

这里有一个我是如何做到的例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

您可以随意使Blah变得复杂,比如类、伴随对象、嵌套类、对多个第三方lib的引用。

KryoSerializationWrapper指:https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

 类似资料:
  • 问题内容: 问题: 假设我的映射器可以是内部调用其他类并创建对象并在内部执行其他操作的函数(def)。(或者它们甚至可以是扩展(Foo)=> Bar并在其apply方法中进行处理的类-但现在让我们忽略这种情况) Spark仅支持Java序列化闭包。有什么办法吗?我们可以用某些东西代替闭包来做我想做的事吗?我们可以使用Hadoop轻松完成此类工作。这件事使Spark对我几乎无法使用。不能期望所有第三

  • 在闭包外调用函数时出现奇怪行为: 当函数在一个对象中时,一切都在运行 当函数位于类get中时: 任务不可序列化:java。伊奥。NotSerializableException:测试 问题是我需要我的代码在一个类中,而不是一个对象。知道为什么会发生这种情况吗?Scala对象是序列化的吗?)? 这是一个工作代码示例: 这是一个不起作用的例子:

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是