当前位置: 首页 > 知识库问答 >
问题:

Scala/Spark:如何定义可序列化的case类(非REPL)?

章飞虎
2023-03-14

这里是Scala新手。我在齐柏林飞艇笔记本上写了一份Spark作业的草稿。我使用数据集api,这样在执行ds.map(s=

现在我正在写一个实际的作业来在Apache Airflow中运行它。主文件如下所示:

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   case class MyCaseClass(...)

   def run() {       
      spark.read
      ...
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}

在这种情况下,我得到:

无法为内部类MainObj$MyCaseClass生成编码器,而无法访问定义此类的作用域。

如果我添加org。阿帕奇。火花sql。催化剂编码器。外窥镜。addOuterScope(这个)run()内部或之前,我得到:

原因:java。伊奥。NotSerializableException:MainObj序列化堆栈:对象不可序列化(类:MainObj,值:MainObj@2f11d889)

我还尝试将case类移动到一个单独的文件(不起作用)或run()(甚至不编译)中。

对这个问题非常沮丧...有人能帮忙吗,或者至少给我指出一个解释case类spark.implicits作用域之间关系的地方?


共有2个答案

姜旭
2023-03-14

我在写问题时漏掉了一段非常重要的代码。

实际上,我的坏代码是这样的:

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   case class MyCaseClass(...)

   // This is what I left out
   def someFunction() { 
      ...
   }

   def run() {       
      spark.read
      ...
      .map(s => { ...someFunction() ... }) // and this
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}

这就是工作代码的样子:

// FIXED: moved case class to the root scope
case class MyCaseClass(...)

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._                  

   def run() {       
      spark.read
      ...
      .map(s => { ... MainObj.someFunction() ... }) // FIXED
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   // FIXED: moved function to a companion object; 
   // now calling it inside map(...) does not trigger serialization 
   // of MainObj object, it works like a static method call in Java
   def someFunction() {}

   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}
雍宇定
2023-03-14

您需要在根级别定义案例类,而不是在类/对象中。

你能用下面的结构试试吗?

case class MyCaseClass(...)

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   def run() {       
      spark.read
      ...
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}
 类似资料:
  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题: 如果我运行以下Scala/Spark代码,它将正常运行: 但是,在声明了此处建议的自定义数据帧类型之后 使用它的例子如下: 这次运行成功。 现在如果我再次运行下面的代码(同上) 我收到了错误信息: rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 这给出的错误如下,任何帮助将是感激的:

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 问题内容: 我使用杰克逊测试了Scala案例类的序列化。 DeserializeTest.java 福斯卡拉 当我运行上述Java类时,抛出了一个异常: 我如何(反)序列化Scala案例类? 问题答案: Jackson希望您的类是JavaBean,这意味着它希望该类的每个属性都具有getX()和/或setX()。 选项1 您可以使用注释BeanProperty在Scala中创建JavaBean类。