当前位置: 首页 > 知识库问答 >
问题:

如何让Spark使用Kryo序列化一个对象?

田冥夜
2023-03-14
object HelloSpark {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
                .setAppName("Testing HelloSpark")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrator", "xt.HelloKryoRegistrator")

        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(1 to 20, 4)
        val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))

        rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
            .collect()
            .foreach(println)

        sc.stop
    }
}

// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) = {
        kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
    }
}

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
        output.writeInt(obj.getLength)
        output.writeInt(obj.getOffset)
        output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
    }

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
        val length = input.readInt()
        val offset = input.readInt()
        val bytes  = new Array[Byte](length)
        input.read(bytes, offset, length)

        new ImmutableBytesWritable(bytes)
    }
}
    null

但是,当我以Yar-Client模式提交Spark应用程序时,抛出了以下异常:

线程“main”org.apache.spark.sparkException:Task not serializable at org.apache.spark.util.closureCleaner$.ensureRecleaner.ccala:166)at org.apache.spark.util.closureCleaner$.clean(closureCleaner.scala:158)at org.apache.spark.sparkcontext.clean(sparkcontext.scala:1242)at org.apache.spark.rdd.rdd.map(rdd.scala:270)at Scala)在sun.reflect.nativeMethodAccessorImpl.invoke0(原生方法)在sun.reflect.nativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)在sun.reflect.delegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:606)在ksubmit.scala)引起的原因:java.io.NotSerializableException:org.apache.hadoop.hbase.io.immutab在java.io.objectOutputStream.writeObject0(ObjectOutputStream.java:1183)在java.io.objectOutputStream.DefaultWriteFields(ObjectOutputStream.java:1547)在java.io.objectOutputStream.writeRegialData(ObjectOutputStream.writeRegialObjectOutputStream.java:1508)在ectoutputstream.java:347)在org.apache.spark.serializer.javaserializationstream.writeObject(javaserializer.scala:42)在org.apache.spark.serializer.javaserializerinstance.serializer(javaserializer.scala:73),在org.apache.spark.util.closurecleaner$.ensureResializable(closurecleaner.scala:164)...12个以上

似乎ImmutableBytesWritable不能被Kryo序列化。那么让Spark使用Kryo序列化对象的正确方法是什么呢?Kryo可以序列化任何类型吗?

共有1个答案

陆展
2023-03-14

发生这种情况是因为在闭包中使用了ImmutableBytesWritable。Spark还不支持Kryo的闭包序列化(只支持RDDs中的对象)。你可以利用这个帮助来解决你的问题:

Spark-task不可序列化:如何处理调用外部类/对象的复杂映射闭包?

您只需在传递闭包之前序列化对象,然后反序列化。即使类不能序列化,这种方法也很有效,因为它在幕后使用Kryo。你只需要一些咖喱。;)

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _
rdd.flatMap(mapper).collectAsMap()

object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}
 类似资料:
  • 我在程序中使用,需要序列化内核以便以后重用它们。 为此,我使用: 如何定义必须使用?

  • 我用这个打开了kryo连载: 我希望确保当在节点之间洗牌时,使用kryo对自定义类进行序列化。我可以通过以下方式向kryo注册该类: 但这会导致IllegalArugmentException被抛出(“class未注册”),用于我假设Spark在内部使用的一系列不同类,例如: 当然,我不必用Kryo手动注册这些单独的类?这些序列化程序都是在kryo中定义的,那么有没有一种方法可以自动注册所有的序列

  • 我应该只使用kryo吗?混合和匹配可以吗(就像我正在做的那样)?

  • 我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。

  • 我有一个现有的域模型,我不想改变与Hazelcast的工作。为此,我想使用StreamSerializer接口和Kryo。我查看了https://github.com/hazelcast/hazelcast-book-examples/blob/master/ch章-serialization/kryo-serializer/src/main/java/personkryoserializer.j

  • 我有一个类,它通过实现中的和方法来实现自定义Kryo序列化程序(请参见下面的示例)。如何用Spark注册此自定义序列化程序? 现在在Spark: 不幸的是,Spark没有给我注册自定义序列化程序的选项。你知道有没有办法做到这一点?