object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
但是,当我以Yar-Client模式提交Spark应用程序时,抛出了以下异常:
线程“main”org.apache.spark.sparkException:Task not serializable at org.apache.spark.util.closureCleaner$.ensureRecleaner.ccala:166)at org.apache.spark.util.closureCleaner$.clean(closureCleaner.scala:158)at org.apache.spark.sparkcontext.clean(sparkcontext.scala:1242)at org.apache.spark.rdd.rdd.map(rdd.scala:270)at Scala)在sun.reflect.nativeMethodAccessorImpl.invoke0(原生方法)在sun.reflect.nativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)在sun.reflect.delegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:606)在ksubmit.scala)引起的原因:java.io.NotSerializableException:org.apache.hadoop.hbase.io.immutab在java.io.objectOutputStream.writeObject0(ObjectOutputStream.java:1183)在java.io.objectOutputStream.DefaultWriteFields(ObjectOutputStream.java:1547)在java.io.objectOutputStream.writeRegialData(ObjectOutputStream.writeRegialObjectOutputStream.java:1508)在ectoutputstream.java:347)在org.apache.spark.serializer.javaserializationstream.writeObject(javaserializer.scala:42)在org.apache.spark.serializer.javaserializerinstance.serializer(javaserializer.scala:73),在org.apache.spark.util.closurecleaner$.ensureResializable(closurecleaner.scala:164)...12个以上
似乎ImmutableBytesWritable不能被Kryo序列化。那么让Spark使用Kryo序列化对象的正确方法是什么呢?Kryo可以序列化任何类型吗?
发生这种情况是因为在闭包中使用了ImmutableBytesWritable
。Spark还不支持Kryo的闭包序列化(只支持RDDs中的对象)。你可以利用这个帮助来解决你的问题:
Spark-task不可序列化:如何处理调用外部类/对象的复杂映射闭包?
您只需在传递闭包之前序列化对象,然后反序列化。即使类不能序列化,这种方法也很有效,因为它在幕后使用Kryo。你只需要一些咖喱。;)
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _
rdd.flatMap(mapper).collectAsMap()
object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
我在程序中使用,需要序列化内核以便以后重用它们。 为此,我使用: 如何定义必须使用?
我用这个打开了kryo连载: 我希望确保当在节点之间洗牌时,使用kryo对自定义类进行序列化。我可以通过以下方式向kryo注册该类: 但这会导致IllegalArugmentException被抛出(“class未注册”),用于我假设Spark在内部使用的一系列不同类,例如: 当然,我不必用Kryo手动注册这些单独的类?这些序列化程序都是在kryo中定义的,那么有没有一种方法可以自动注册所有的序列
我应该只使用kryo吗?混合和匹配可以吗(就像我正在做的那样)?
我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。
我有一个现有的域模型,我不想改变与Hazelcast的工作。为此,我想使用StreamSerializer接口和Kryo。我查看了https://github.com/hazelcast/hazelcast-book-examples/blob/master/ch章-serialization/kryo-serializer/src/main/java/personkryoserializer.j
我有一个类,它通过实现中的和方法来实现自定义Kryo序列化程序(请参见下面的示例)。如何用Spark注册此自定义序列化程序? 现在在Spark: 不幸的是,Spark没有给我注册自定义序列化程序的选项。你知道有没有办法做到这一点?