val file = File.createTempFile("temp", ".avro")
val schema = new Schema.Parser().parse(st)
val datumWriter = new GenericDatumWriter[GenericData.Record](schema)
val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter)
dataFileWriter.create(schema , file)
rdd.foreach(r => {
dataFileWriter.append(r)
})
dataFileWriter.close()
我有一个类型为GenericData的
,但我得到了这个DStream
。记录我试图以Avro格式写入HDFS的任务不可序列化
错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:217)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:210)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.avro.file.DataFileWriter
Serialization stack:
- object not serializable (class: org.apache.avro.file.DataFileWriter, value: org.apache.avro.file.DataFileWriter@78f132d9)
- field (class: KafkaCo$$anonfun$main$3$$anonfun$apply$1, name: dataFileWriter$1, type: class org.apache.avro.file.DataFileWriter)
- object (class KafkaCo$$anonfun$main$3$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
由于lambda必须分布在集群中才能运行,因此它们必须只引用可序列化的数据,以便可以序列化、发送给不同的执行者进行部署并作为任务在那里执行。
您可能会做的是:
map分区
(而不是map
)方法并为每个分区创建一个新编写器这里的关键点是,DataFileWriter是一个本地资源(绑定到本地文件),所以序列化它没有意义。
调整代码以执行诸如映射分区之类的操作也不会有帮助,因为这种绑定到执行器的方法将在执行器的本地文件系统上写入文件。
我们需要使用支持Spark分布式特性的实现,例如https://github.com/databricks/spark-avro
使用该库:
给定由案例类表示的一些模式,我们将执行以下操作:
val structuredRDD = rdd.map(record => recordToSchema(record))
val df = structuredRDD.toDF()
df.write.avro(hdfs_path)
我需要将包含类型为Pair的对象的列表序列化为xml 首先,我创建了一个类PairList来保存对的列表,然后我创建了一个实际的类,它表示一对两个值,key和value。 然后,我尝试序列化它: 不幸的是,我遇到了一个异常:。欢迎任何关于如何避免此异常并序列化该类的想法。 如果我选择不序列化ttype和utype字段(通过将其设置为受保护或私有),则序列化有效。我不明白为什么它不想序列化类型字段。
我已经上了三节课 任务未序列化
问题内容: 我正在尝试制作一个使用Jackson来反序列化POJO的类。 看起来像这样… 我对此实施有2个问题。 首先是我将类类型传递给方法,以便对象映射器知道应反序列化的类型。有使用泛型的更好方法吗? 同样在get方法中,我将一个从objectMapper返回的对象强制转换为T。这看起来特别讨厌,因为我必须在此处强制转换T,然后还必须从调用它的方法中强制转换对象类型。 我在该项目中使用了Robo
我用Jackson编写了自己的序列化程序。它接受一个变量或类,并返回任何简单类型的值。 示例:serialize(new MyClass(2.0))将返回一个值为 2.0 的双精度值,其中 MyClass 如下所示: 因此,为了获得正确的值,我需要设置@JsonValue,但是,当我序列化一个没有@JsonValue注释的对象(例如UUID)时,它会返回预期的UUID字符串。 创建我自己的类没有@
我有一个带有Object类型属性的基类型(一段遗留代码,许多项目都使用这个基类型)。后来添加了基类型的泛型版本,将属性公开为泛型类型。 使用ServiceStack.Text序列化和反序列化泛型类型将设置基类(type object)上的属性,而不是派生类上更特定的类型。 重现错误的简单控制台应用程序如下所示: 感谢任何帮助。 基于这个答案,我通过使和从一个新的抽象基类继承来解决这个问题,如下所示
我试图使用Spark 1.0在HBase(0.96.0-hadoop2)中编写一些简单的数据,但我一直遇到序列化问题。以下是相关代码: 运行代码会导致: 用map替换foreach不会崩溃,但我也不会写。任何帮助都将不胜感激。