当前位置: 首页 > 知识库问答 >
问题:

列表的Spark/Scala序列化。任务不可序列化:java.io.NotSerializable异常

杜元明
2023-03-14

问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。

这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。

我有一门案例课:

case class FlightExt(callsign: Option[String], serials: List[Int])

我的主要方法是:

    val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
    val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file

    import sctx.implicits._
    flightsDataFrame.as[FlightExt]
      .map(flight => flight.callsign)
      .show()

我得到以下错误:

SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package scala)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
    - object (class scala.reflect.internal.Types$UniqueThisType, scala.type)
    - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type)
    - object (class scala.reflect.internal.Types$TypeRef$$anon$6, scala.Int)
    - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, name: elementType$2, type: class scala.reflect.api.Types$TypeApi)
    - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$5, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType))
    - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)))
    - writeObject data (class: scala.collection.immutable.$colon$colon)
    - object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;))))
    - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq)
    - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class scala.collection.mutable.WrappedArray$,ObjectType(interface scala.collection.Seq),make,invoke(mapobjects(<function1>,cast(serials#7 as array<int>),IntegerType),array,ObjectType(class [Ljava.lang.Object;)),true))
    - writeObject data (class: scala.collection.immutable.$colon$colon)

如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。

Scala本身似乎序列化了一系列Int的优点。也许Spark在序列化列表方面有问题?

我还尝试过使用Java整数。

编辑:

如果我将列表更改为数组,它会工作,但如果我有如下内容:

case class FlightExt(callsign: Option[String], other: Array[AnotherCaseClass])

它也会因同样的错误而失败

我是Scala和Spark的新手,可能遗漏了一些东西,但如果有任何解释,我将不胜感激。

共有1个答案

饶明亮
2023-03-14

FlightExt类放入对象中,检查下面的代码。

object Flight {
 case class FlightExt(callsign: Option[String], var serials: List[Int])
}

使用Flight。FlightExt

val (ctx, sctx) = SparkUtil.createContext() // just a helper function to build context
    val flightsDataFrame = separateFlightsMock(sctx) // reads data from Parquet file

    import sctx.implicits._
    flightsDataFrame.as[Flight.FlightExt]
      .map(flight => flight.callsign)
      .show()

 类似资料:
  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 这给出的错误如下,任何帮助将是感激的:

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

  • 在我的程序中,我有一个返回一些RDD的方法,我们称它为,它接受一个不可序列化的参数,并让RDD的类型为(我真正的RDD是元组类型,但只包含基元类型)。 当我尝试这样的事情时: 我得到的。 当我用替换(即某个常数)时,它会运行。 从序列化跟踪中,它试图序列化,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。 当我试图直接收集的输出时,即 我也没有问题。 该方法使用获取(本地)值序列