当前位置: 首页 > 知识库问答 >
问题:

火花函数不可串行化

唐阳飙
2023-03-14

我有一门课:

class DataLoader {

  def rdd2RddTransform(
    ss: SparkSession,
    inputRDD: RDD[GenericRecord]): RDD[GenericRecord] = {

    inputRDD.asInstanceOf[RDD[TrainingData]]
            .map(reformatTrainingData)
  }

  private def reformatTrainingData: TrainingData => ReFormatedData
               = (trainingData: TrainingData) => {func implement}

}

它运行得很好,但抛出了一个例外:org。阿帕奇。火花SparkException:Task not serializable在我对RDD的映射做了一个小更改之后:

inputRDD.asInstanceOf[RDD[TrainingData]].map(reformatTrainingData(_))

我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?

共有1个答案

姜奇
2023-03-14

这是因为方法和函数在Scala中不太可互换。

函数是独立的对象(即类的实例,如函数1函数2函数3...),但是方法仍然与它们的封闭类绑定。如果封闭类不可序列化,这可能会在Spark中产生问题-当Spark尝试序列化方法时,它无法序列化关联的类实例。

请注意,您的reformatTrainingData是一个返回函数的方法

所以当你这样称呼:

rdd.map(reformatTrainingData)

您实际上是在调用no-argreformatTrainingData方法,并返回一个可以安全序列化的独立函数1实例

private def reformatTrainingData(): TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData())

强调有一个方法调用正在发生。

当您更改为reformatTrainingData()时,您使用的是部分应用的方法;当Spark试图对其进行序列化时,它需要拉入并序列化封闭的数据加载器类,该类未标记为可序列化

如果reformatTrainingDataTrainingData=类型的简单方法,也会出现同样的问题

如果您将DataLoader标记为扩展Serializable,那么任何一个版本都可以工作。

也可以将reformatTrainingData转换为val,因为在序列化时,val不会拉入封闭类:

private val reformatTrainingData: TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData)

 类似资料:
  • 我注意到,当我在DataFrame上使用窗口函数后,如果我用函数调用map()时,Spark会返回一个“Task not serializable”异常这是我的代码: 这是堆栈跟踪: 异常:任务不可序列化在org.apache.spark.util.ClosureCleaner$.EnsureClealizable(ClosureCleaner.scala:304)在org.apache.spar

  • 我想出一个例外: 在这个程序中,我尝试从hdfs路径读取记录,并将它们保存到Kafka中。问题是当我移除关于向Kafka发送记录的代码时,它运行得很好。我错过了什么?

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 我正在使用一个火花流作业,它使用带有初始RDD的mapAnd State。当重新启动应用程序并从检查点恢复时,它会失败,出错: 此RDD缺少SparkContext。它可能发生在以下情况: RDD转换和操作不是由驱动程序调用的,而是在其他转换内部调用的;例如,rdd1.map(x= 中描述了此行为https://issues.apache.org/jira/browse/SPARK-13758但它

  • 我想从Spark v.1.6(使用scala)数据帧创建一个JSON。我知道有一个简单的解决方案,就是做。 但是,我的问题看起来有点不同。例如,考虑具有以下列的数据帧: 我想在最后有一个数据帧 其中C是包含、、的JSON。不幸的是,我在编译时不知道数据框是什么样子的(除了始终“固定”的列和)。 至于我需要这个的原因:我使用Protobuf发送结果。不幸的是,我的数据帧有时有比预期更多的列,我仍然会