我有一门课:
class DataLoader {
def rdd2RddTransform(
ss: SparkSession,
inputRDD: RDD[GenericRecord]): RDD[GenericRecord] = {
inputRDD.asInstanceOf[RDD[TrainingData]]
.map(reformatTrainingData)
}
private def reformatTrainingData: TrainingData => ReFormatedData
= (trainingData: TrainingData) => {func implement}
}
它运行得很好,但抛出了一个例外:org。阿帕奇。火花SparkException:Task not serializable
在我对RDD的映射做了一个小更改之后:
inputRDD.asInstanceOf[RDD[TrainingData]].map(reformatTrainingData(_))
我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?
这是因为方法和函数在Scala中不太可互换。
函数是独立的对象(即类的实例,如函数1
,函数2
,函数3
...),但是方法仍然与它们的封闭类绑定。如果封闭类不可序列化,这可能会在Spark中产生问题-当Spark尝试序列化方法时,它无法序列化关联的类实例。
请注意,您的reformatTrainingData
是一个返回函数的方法
所以当你这样称呼:
rdd.map(reformatTrainingData)
您实际上是在调用no-argreformatTrainingData
方法,并返回一个可以安全序列化的独立函数1
实例
private def reformatTrainingData(): TrainingData => ReFormatedData ...
rdd.map(reformatTrainingData())
强调有一个方法调用正在发生。
当您更改为reformatTrainingData()
时,您使用的是部分应用的方法;当Spark试图对其进行序列化时,它需要拉入并序列化封闭的数据加载器
类,该类未标记为可序列化
。
如果reformatTrainingData
是TrainingData=类型的简单方法,也会出现同样的问题
如果您将
DataLoader
标记为扩展Serializable
,那么任何一个版本都可以工作。
也可以将
reformatTrainingData
转换为val
,因为在序列化时,val不会拉入封闭类:
private val reformatTrainingData: TrainingData => ReFormatedData ...
rdd.map(reformatTrainingData)
我注意到,当我在DataFrame上使用窗口函数后,如果我用函数调用map()时,Spark会返回一个“Task not serializable”异常这是我的代码: 这是堆栈跟踪: 异常:任务不可序列化在org.apache.spark.util.ClosureCleaner$.EnsureClealizable(ClosureCleaner.scala:304)在org.apache.spar
我想出一个例外: 在这个程序中,我尝试从hdfs路径读取记录,并将它们保存到Kafka中。问题是当我移除关于向Kafka发送记录的代码时,它运行得很好。我错过了什么?
我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:
一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?
我正在使用一个火花流作业,它使用带有初始RDD的mapAnd State。当重新启动应用程序并从检查点恢复时,它会失败,出错: 此RDD缺少SparkContext。它可能发生在以下情况: RDD转换和操作不是由驱动程序调用的,而是在其他转换内部调用的;例如,rdd1.map(x= 中描述了此行为https://issues.apache.org/jira/browse/SPARK-13758但它
我想从Spark v.1.6(使用scala)数据帧创建一个JSON。我知道有一个简单的解决方案,就是做。 但是,我的问题看起来有点不同。例如,考虑具有以下列的数据帧: 我想在最后有一个数据帧 其中C是包含、、的JSON。不幸的是,我在编译时不知道数据框是什么样子的(除了始终“固定”的列和)。 至于我需要这个的原因:我使用Protobuf发送结果。不幸的是,我的数据帧有时有比预期更多的列,我仍然会