当前位置: 首页 > 知识库问答 >
问题:

使用Spark的奇怪“任务不可序列化”

汪深
2023-03-14

在我的程序中,我有一个返回一些RDD的方法,我们称它为myMethod,它接受一个不可序列化的参数,并让RDD的类型为Long(我真正的RDD是元组类型,但只包含基元类型)。

当我尝试这样的事情时:

val x: NonSerializableThing = ...
val l: Long = ...
myMethod(x, l).map(res => res + l) // myMethod's RDD does NOT include the NonSerializableThing

我得到的任务不可序列化

当我用res 1L替换res l(即某个常数)时,它会运行。

从序列化跟踪中,它试图序列化非序列化对象,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。

当我试图直接收集mymethod的输出时,即

myMethod(x, l).take(1) foreach println

我也没有问题。

该方法使用NonSerializableThing获取(本地)值序列,对其进行多个Cassandra查询(这是必需的,因为我需要构造要查询的分区键),如下所示:

def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
  val someParam1: String = x.someProperty
  x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
    val someParam2: String = y.someOtherProperty
    y.someOtherSeq.map(someParam3: String =>
      sc.cassandraTable("fooKeyspace", "fooTable").
      select("foo").
      where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
      map(_.getLong(0))
  }.reduce((a, b) => a.union(b))
}

getSomeSeqsomeOtherSeq返回纯非火花Seqs

我想要实现的是“联合”多个Cassandra查询。

这里有什么问题?

根据杰姆·塔克的要求进行编辑和补遗:

我班上有这样的东西:

implicit class MySparkExtension(sc: SparkContext) {

  def getThing(/* some parameters */): NonSerializableThing = { ... }

  def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
    val someParam1: String = x.someProperty
    x.getSomeSeq.flatMap(y: OtherNonSerializableThing => {
      val someParam2: String = y.someOtherProperty
      y.someOtherSeq.map(someParam3: String =>
        sc.cassandraTable("fooKeyspace", "fooTable").
        select("foo").
        where("bar=? and quux=? and baz=? and l=?", someParam1, someParam2, someParam3, l).
        map(_.getLong(0))
    }.reduce((a, b) => a.union(b))
  }
}

这是在包对象中声明的。问题出现在这里:

// SparkContext is already declared as sc
import my.pkg.with.extension._

val thing = sc.getThing(/* parameters */)
val l = 42L
val rdd = sc.myMethod(thing, l)
// until now, everything is OK.
// The following still works:
rdd.take(5) foreach println
// The following causes the exception:
rdd.map(x => x >= l).take(5) foreach println
// While the following works:
rdd.map(x => x >= 42L).take(5) foreach println

我在Spark shell以及通过Spark submit提交的算法中测试了输入“live”的数据。

我现在想尝试(根据我最后的评论)如下:

implicit class MySparkExtension(sc: SparkContext) {

  def getThing(/* some parameters */): NonSerializableThing = { ... }

  def myMethod(x: NonSerializableThing, l: Long): RDD[Long] = {
    val param1 = x.someProperty
    val partitionKeys =
      x.getSomeSeq.flatMap(y => {
        val param2 = y.someOtherProperty
        y.someOtherSeq.map(param3 => (param1, param2, param3, l)
      }
    queryTheDatabase(partitionKeys)
  }

  private def queryTheDatabase(partitionKeys: Seq[(String, String, String, Long)]): RDD[Long] = {
    partitionKeys.map(k =>
      sc.cassandraTable("fooKeyspace", "fooTable").
         select("foo").
         where("bar=? and quux=? and baz=? and l=?", k._1, k._2, k._3, k._4).
         map(_.getLong(0))
    ).reduce((a, b) => a.union(b))
  }
}

我相信这可以工作,因为RDD是在方法queryTheDatabase中构建的,其中不存在nonSerializableThing

另一个选项可能是:NonSerializableThing确实是可序列化的,但我将SparkContext作为隐式构造函数参数传入其中。我想如果我让这个暂时的,它会(无用地)被序列化,但不会引起任何问题。

共有1个答案

龚伯寅
2023-03-14

当用1L替换l时,Spark不再尝试用中的方法/变量序列化类,因此不会引发错误。

您应该能够通过标记val x:NonSerializableThing= 作为瞬态,例如。

@transient
val x: NonSerializableThing = ...

这意味着当类被序列化时,这个变量应该被忽略。

 类似资料:
  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是

  • 这给出的错误如下,任何帮助将是感激的:

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序