当前位置: 首页 > 知识库问答 >
问题:

了解Spark的闭包及其序列化

程智明
2023-03-14

免责声明:刚刚开始和斯巴克玩。

我很难理解著名的“任务不可序列化”异常,但我的问题与我在SO上看到的问题有点不同(或者我认为是这样)。

我有一个很小的自定义RDD(TestRDD)。它有一个字段,用于存储类不实现序列化的对象(nonSerializable)。我已经设置了spark.serializer配置选项来使用Kryo。但是,当我在我的RDD上尝试Count()时,我得到了以下结果:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

当我往里面看的时候。submitMissingTasks我看到它在我的RDD上使用了闭包序列化程序,这是Java序列化程序,而不是我期望的Kryo序列化程序。我已经读到Kryo在序列化闭包方面有问题,Spark总是使用Java序列化程序进行闭包,但我不太明白闭包是如何在这里发挥作用的。我在这里做的就是:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

也就是说,没有映射程序或任何需要闭包序列化的东西。OTOH这是有效的:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

Kryo序列化程序按预期使用,不涉及闭包序列化程序。如果我没有将serializer属性设置为Kryo,这里也会出现异常。

我很感激任何关于闭包来自何处以及如何确保我可以使用Kryo序列化定制RDD的说明。

更新:这里是TestRDD及其不可序列化字段mNS

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}

共有1个答案

丰飞龙
2023-03-14

当我查看DAGScheduler.submitMisSingTask时,我看到它在我的RDD上使用它的闭包序列化器,这是Java的序列化器,而不是我期望的Kryo序列化器。

SparkEnv支持两个序列化器,一个名为序列化器,用于数据的序列化、检查点、工作者之间的消息传递等,可在spark.serializer配置标志下使用。另一个名为closureSerializer,位于spark.closure.serializer下,用于检查您的对象实际上是可序列化的,并且可以为Spark配置

Kryo闭包序列化程序有一个使其无法使用的错误,你可以在SPARK-7708下看到这个错误(这可能是用Kryo 3.0.0修复的,但SPARK目前用Kryo 2.2.1上修复的特定版本的Chill修复的)。此外,对于Spark 2.0。JavaSerializer现在是固定的,而不是可配置的(您可以在这个pull请求中看到)。这意味着我们实际上要使用JavaSerializer来实现闭包序列化。

我们使用一个序列化器来提交任务,另一个序列化工作者之间的数据,这很奇怪吗?当然,但这就是我们所拥有的。

总而言之,如果您正在设置spark.serializer配置,或者使用SparkContext.registerKryoClass,您将在Spark中使用Kryo进行大部分序列化。话虽如此,为了检查给定的类是否可序列化以及任务是否可序列化给工作者,Spark将使用JavaSerializer

 类似资料:
  • 在Spark中,如何知道哪些对象在driver上实例化,哪些对象在executor上实例化,因此如何确定哪些类需要实现Serializable?

  • 我有25万个这样的事件: 一个时间段代表半个30分钟,“Anzahl”是一个时间段中的事件数,第一个时间段从2011-01-01 00:00:00开始,“WochenSlots”是时间段%%336,从周六00:00:00开始。所以我想在一周内看到分布情况。 我现在想做的是: 以x标度显示日期(星期一00:00-星期日24:00) 显示显示x%事件分布的行(信封)。 我不知道该怎么做。 dput(P

  • 问题内容: 您如何向外行解释Passport的序列化和反序列化方法的工作流程。 叫到哪里去了? 我们紧接着在工作流程中调用它吗? 我仍在努力寻找解决方案。我有一个完整的工作应用程序,没有遇到任何类型的错误。 我只是想了解这里到底发生了什么? 任何帮助表示赞赏。 问题答案: 叫到哪里去了? 用户ID(您作为函数的第二个参数提供)被保存在会话中,以后用于通过函数检索整个对象。 确定应将用户对象的哪些数

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 本文向大家介绍javascript闭包的理解,包括了javascript闭包的理解的使用技巧和注意事项,需要的朋友参考一下 1、首先我们要知道变量作用域链 变量的作用域分两种:全局变量和局部变量。没有定义到任何函数中的变量为全局变量,在函数中定义的变量为局部变量,注意在函数内部定义变量时一定要使用var关键字,不带var关键字的变量为全局变量。 javascript中每一段代码都有与之关联的作用域

  • 本文向大家介绍Javascript的闭包详解,包括了Javascript的闭包详解的使用技巧和注意事项,需要的朋友参考一下 前言:还是一篇入门文章。Javascript中有几个非常重要的语言特性——对象、原型继承、闭包。其中闭包 对于那些使用传统静态语言C/C++的程序员来说是一个新的语言特性。本文将以例子入手来介绍Javascript闭包的语言特性,并结合一点 ECMAScript语言规范来使读