当前位置: 首页 > 知识库问答 >
问题:

带序列化的Scala反射(过火花)-符号不可序列化

戴原
2023-03-14

首先,我使用的是scala 2.10.4,上面的例子是在Spark 1.6中运行的(尽管我怀疑Spark与此有关,但这只是一个序列化问题)。

所以我的问题是:假设我有一个traitBase,它由两个类实现,比如说B1B2。现在,我有一个泛型特征,它由一组类扩展,其中一个类位于Base的子类型之上,例如(这里我保留了Spark对RDD的概念,但一旦序列化,它实际上可能是另一个类;不管实际情况如何,它都只是一个结果):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something  = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something  = ... }
...

现在我需要一个对象,它将接受一个RDD[T](假设这里没有ambuiquity,它只是一个简化的版本)一个,它返回某个,与类型T对应的函数的结果相对应。但它也应该适用于带有合并策略的Array[T]。目前看来:

object Obj {
   def compute[T: TypeTag](input: RDD[T]): Something = {
      typeOf[T] match {
         case t if t <:< typeOf[A] => 
            val foo = new Foo[T]
            foo.function(rdd)
         case t if t <:< typeOf[Array[A]] => 
            val foo = new Foo[A]
            foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
         case t if t <:< typeOf[Base] => 
            val foo = new Foo[T]
            foo.function(rdd)
         // here it gets ugly...
         case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
            val tt = getSubInfo[T](0)
            val tpe = tt.tpe
            val foo = new Foo[tpe.type]
            foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
      }
   }

   // strategy to transform arrays of T into a T object when possible
   private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

  // extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
   private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
}

不幸的是,它在本地机器上似乎运行良好,但当它被发送到Spark(序列化)时,我得到了一个org。阿帕奇。火花SparkException:任务不可序列化

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)

我确实有一个解决办法(很明显,列举了各种可能性),但出于好奇,有没有办法解决这个问题?为什么符号不可序列化,而清单中的等价物是可序列化的?

谢谢你的帮助。

共有1个答案

东方方伟
2023-03-14

TypeTags现在通常可以在scala中序列化,但奇怪的是,不能直接类型(这很奇怪,因为typetags包含不是:-/的符号)。

这可能是你想要的

// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
  def typ:Type = _typeCached

  @transient
  lazy val _typeCached:Type = typeOf[T]
}

trait Foo[T] extends Serializable { 
  def function(rdd: RDD[T]): Something  {... impl here?...}
  def typ:Type 
}

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
   def function(rdd: RDD[T]): Something  {... impl here?...}
}
 类似资料:
  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 有没有关于为什么整个对象B需要序列化的想法? 关于“对象不可序列化”的异常:

  • 当我运行Spark Scala程序时,有一个“Task not serializable”异常 Spark RDD是不可串行化类型(java类) 调用的函数来自不可序列化的类(java类,再次) 我的代码是这样的 我注意到我可以用 但对于RDD中的对象类,我仍然会遇到这个例外。我会以另一种方式,也会以另一种方式,也就是第二部分,因为我不想创建大量PredicateClass的对象。 你能帮我吗?我

  • 可以序列化/反序列化< code >映射吗 在这种特殊情况下,我知道总是,和 - 第三方类(我有序列化器和反序列化器),其他值是盒装原语。 有可能和杰克逊做这样的事吗?使用MapSerializer/MapDeserializer可以做到这一点吗?(我找不到任何例子)

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。