当前位置: 首页 > 知识库问答 >
问题:

从Spark驱动程序反序列化Java的本机readObject时

广绪
2023-03-14

我有两个火花作业A和B,因此A必须在B之前运行。A的输出必须可从以下位置读取:

  • 火花工作B
  • Spark环境之外的独立Scala程序(没有Spark依赖)

我目前正在使用Java的本机序列化与Scala案例类。

从“火花工作”开始:

val model = ALSFactorizerModel(...)

context.writeSerializable(resultOutputPath, model)

使用序列化方法:

def writeSerializable[T <: Serializable](path: String, obj: T): Unit = {
  val writer: OutputStream = ... // Google Cloud Storage dependant
  val oos: ObjectOutputStream = new ObjectOutputStream(writer)
  oos.writeObject(obj)
  oos.close()
  writer.close()
}

从B Spark作业或任何独立的非Spark Scala代码:

val lastFactorizerModel: ALSFactorizerModel = context
                     .readSerializable[ALSFactorizerModel](ALSFactorizer.resultOutputPath)

使用反序列化方法:

def readSerializable[T <: Serializable](path: String): T = {
  val is : InputStream = ... // Google Cloud Storage dependant
  val ois = new ObjectInputStream(is)
  val model: T = ois
    .readObject()
    .asInstanceOf[T]
  ois.close()
  is.close()

  model
}

(嵌套的)案例类:

模型:

package mycompany.algo.als.common.io.model.factorizer

import mycompany.data.item.ItemStore

@SerialVersionUID(1L)
final case class ALSFactorizerModel(
  knownItems:       Array[ALSFeaturedKnownItem],
  unknownItems:     Array[ALSFeaturedUnknownItem],
  rank:             Int,
  modelTS:          Long,
  itemRepositoryTS: Long,
  stores:           Seq[ItemStore]
) {   
}

项目存储:

package mycompany.data.item

@SerialVersionUID(1L)
final case class ItemStore(
  id:     String,
  tenant: String,
  name:   String,
  index:  Int
) {
}

输出:

  • 来自独立的非Spark Scala程序=

例外情况:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel.stores of type scala.collection.Seq in instance of mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at mycompany.fs.gcs.SimpleGCSFileSystem.readSerializable(SimpleGCSFileSystem.scala:71)
  at mycompany.algo.als.batch.strategy.ALSClusterer$.run(ALSClusterer.scala:38)
  at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
  at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我是不是漏了什么?我应该配置Dataproc/Spark以支持对该代码使用Java序列化吗?

我用--jars提交作业

Scala版本:2.11.8 Spark版本:2.0.2 SBT版本:0.13.13

谢谢你的帮助


共有1个答案

南宫才艺
2023-03-14

stores:Seq[ItemStore]替换为stores:Array[ItemStore]为我们解决了这个问题。

或者,我们可以使用另一个类加载器来执行ser/deser-iize操作。

希望这能有所帮助。

 类似资料:
  • 问题内容: 《 有效的Java》 和其他资源这本书对使用可序列化Java类时如何以及何时使用readObject()方法提供了很好的解释。另一方面,readResolve()方法仍然有点神秘。基本上,我发现的所有文档都只提到了两者之一,或者只单独提到了两者。 仍未解决的问题是: 两种方法有什么区别? 什么时候应采用哪种方法? 应该如何使用readResolve(),尤其是在返回什么方面? 希望您能

  • 问题内容: 什么之间的区别,并在上课吗?关于差异,我似乎找不到太多信息。 问题答案: 调用默认的反序列化机制,并在类上定义方法时使用。换句话说,当您具有自定义反序列化逻辑时,您仍然可以返回默认的序列化,这将对您的非静态,​​非瞬态字段进行反序列化。例如: 另一方面,当您在反序列化的对象的外部创建并想要读取先前已序列化的对象时,将使用:

  • 这个示例直接取自Spark示例代码,所以我不太清楚到底发生了什么。 我在localhost上运行的Spark独立集群上运行这个。 工人始终失败: 我运行的是Java 11,使用的是Spark 3.0.1。 我确实发现了一个非常相似的问题,看起来它就是答案:java。lang.ClassCastException在远程服务器上的spark作业中使用lambda表达式 然而,在确保将TestSpark

  • 问题内容: 我尝试过在Java和Android之间实现跨平台序列化。我使用了Serializable,并将我的代码在Android中与台式机Java放在同一软件包中。 来源:java-desktop序列化 资料来源:Android-反序列化 学生是一类,实现了Serializable。在桌面上,我将学生实例序列化为“ thestudent.dat”。我将此文件放在Android设备上的SD卡上,并

  • 主要内容:1 Java序列化和反序列化,2 Java序列化的优点,3 java.io.Serializable接口,4 Java ObjectOutputStream,5 Java ObjectInputStream,6 Java序列化的例子,7 Java反序列化的例子1 Java序列化和反序列化 Java中的序列化是一种将对象状态写入字节流的机制。它主要用于Hibernate,RMI,JPA,EJB和JMS技术。 序列化的反向操作称为反序列化,其中字节流被转换为对象。序列化和反序列化过程与平台

  • 上一小节我们学习了 Java 的输入输出流,有了这些前置知识点,我们就可以学习 Java 的序列化了。本小节将介绍什么是序列化、什么是反序列化、序列化有什么作用,Serializable 接口以及 Externalizable 接口,常用序列化工具介绍等内容。 1. 序列化与反序列化 序列化在计算机科学的数据处理中,是指将数据结构或对象状态转换成可取用格式,以留待后续在相同或另一台计算机环境中,能