当前位置: 首页 > 知识库问答 >
问题:

Spark scala任务不可序列化以关闭

欧阳鸿德
2023-03-14

我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。

def fn(l: Long): Boolean = true
rdd.filter{ row => fn(row.getAs[Long]("field")) }

我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.filter(RDD.scala:340)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
$line131.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
$line131.$read$$iwC$$iwC$$iwC.<init>(<console>:54)
$line131.$read$$iwC$$iwC.<init>(<console>:56)
$line131.$read$$iwC.<init>(<console>:58)
$line131.$read.<init>(<console>:60)
$line131.$read$.<init>(<console>:64)
$line131.$read$.<clinit>(<console>)
$line131.$eval$.<init>(<console>:7)
$line131.$eval$.<clinit>(<console>)
$line131.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:351)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:722)

更新:

一个更完整的例子。我正在使用Toree运行Jupyter并从我的单元格中的jar文件执行代码。以下是我尝试过的三个失败的事情

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

class NotWorking1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](longField)) }
}

object NotWorking1 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    def myFn(l: Long): Boolean = true
    new NotWorking1(sc, sqlContext, myFn)
  }
}

class NotWorking2(sc: SparkContext, sqlContext: SQLContext) {
  def myFn(l: Long): Boolean = true

  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking2 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking2(sc, sqlContext)
  }
}

class NotWorking3(sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    def myFn(l: Long): Boolean = true
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking3 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking3(sc, sqlContext)
  }
}

从Jupyter单元格,我导入适当的类并运行

val nw1 = NotWorking1(sc, sqlContext)
val nw2 = NotWorking2(sc, sqlContext)
val nw3 = NotWorking3(sc, sqlContext)
nw1.myFilterer(rdd, "field")
nw2.myFilterer(rdd, "field")
nw3.myFilterer(rdd, "field")

这三个都失败了。NotWorking3特别令人惊讶。我可以做任何事情来隔离函数而不是尝试序列化整个对象(我相信这无论如何都会给我带来麻烦,因为我保留了对火花和sql上下文的引用)

共有2个答案

谭文林
2023-03-14

@JustinPihony给出的答案是正确的:Jupyter将动态创建一个类,其中包含您在其会话中键入的代码,然后代表您提交给spark。您创建的fn需要包含该封闭类。

您可能需要jar将自定义逻辑升级到用户定义的jar文件中,并将其包含在jupyter类路径中。添加到类路径的过程将取决于您使用的jupyter内核。

蓟雪峰
2023-03-14

在我的经验中,最简单的方法就是,如果你想让函数可以序列化,就直接使用函数而不是方法。换句话说,如果希望将代码片段发送给执行者,请使用val而不是def来定义它们。

在您的示例中,在NotWorking3类中,将myFn更改如下,它将工作:

val myFn = (l: Long) => true

更新:

对于NotWorking1和2,除了使用val而不是def,您还需要扩展Serializable trait并使用@SerialVersionUID注释。这是您的示例的工作版本(这里和那里略有更改):

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

@SerialVersionUID(100L)
class Working1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) extends Serializable{
  def myFilterer(rdd:RDD[Row]): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](0)) }
}

@SerialVersionUID(101L)
class Working2 (sc: SparkContext, sqlContext: SQLContext) extends Serializable{
  val myFn = (l: Long) => true

  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

class Working3 (sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    val myFn = (l: Long) => true
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

val myFnGlobal = (l: Long) => true
val r1 = sc.parallelize(List(1L,2L,3L,4L,5L,6L,7L)).map(x => Row(x))

val w1 = new Working1(sc, sqlContext, myFnGlobal)
val w2 = new Working2(sc, sqlContext)
val w3 = new Working3(sc, sqlContext)
w1.myFilterer(r1).collect
w2.myFilterer(r1).collect
w3.myFilterer(r1).collect
 类似资料:
  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

  • 这给出的错误如下,任何帮助将是感激的:

  • 我已经上了三节课 任务未序列化