当前位置: 首页 > 知识库问答 >
问题:

SparkContext在伴生对象中不可序列化

王翰墨
2023-03-14

我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构

https://github.com/dieterichlawson/admm

该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。

我遵循同样的方案,但为了进行局部计算,我需要对每个训练样本块执行L-BFGS算法。为了做到这一点,我想使用mlLib中的L-BFGS算法,该算法具有以下特征。

runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient, 
         Updater updater, int numCorrections, double convergenceTol, 
         int maxNumIterations, double regParam, Vector initialWeights)

正如它所说,该方法将训练样本的RDD(对象,向量)作为输入。问题是,在每个worker上,我不再在本地保留数据的RDD结构。因此,我试图在矩阵的每个块上使用SparkContext的并行化函数。但当我这样做时,我会得到一个序列化程序异常。(确切的例外信息在问题的末尾)。

这是关于我如何处理SparkContext的详细说明。

首先,在主应用程序中,它用于打开一个文本文件,并在类LogRegressionXUpdate的工厂中使用:

val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)

在应用程序中,LogRegressionXUpdate类实现如下

class LogRegressionXUpdate(val training: RDD[(Double, NV)],
                           val rho: Double) extends Function1[BDV[Double],Double] with Prox  with Serializable{

def prox(x: BDV[Double], rho: Double): BDV[Double] = {
    val numCorrections = 10
    val convergenceTol = 1e-4
    val maxNumIterations = 20
    val regParam = 0.1
    val (weights, loss) = LBFGS.runLBFGS(
        training,
        new GradientForLogRegADMM(rho,fromBreeze(x)),
        new SimpleUpdater(),
        numCorrections,
        convergenceTol,
        maxNumIterations,
        regParam,
        fromBreeze(x))
    toBreeze(weights.toArray).toDenseVector
}

def apply(x: BDV[Double]): Double = {
    Math.pow(1,2.0)
}

}

与以下伴生对象:

object LogRegressionXUpdate {
    def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
        val fns = new BlockMatrix(file, blockHeight).blocks.
        map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
        new RDF[LogRegressionXUpdate](fns, 0L)
    }
}

这个构造函数导致了一个序列化错误,尽管我并不真的需要SparkContext在本地构建每个RDD。我一直在寻找这个问题的解决方案,添加@transient并没有解决它。然后,我的问题是:真的有可能构建这些“第二层RDD”吗?或者我被迫执行L-BFGS算法的非分布式版本。提前谢谢!

错误日志:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more

共有1个答案

董高逸
2023-03-14

RDDs只能从驱动程序访问。每当你打电话给

myRDD.map(someObject.someMethod)

spark序列化计算someMethod所需的任何内容,并将其发送给工作人员。在那里,该方法被反序列化,然后在每个分区上独立运行。

但是,您尝试使用本身使用火花的方法:您尝试创建一个新的RDD。但是,这是不可能的,因为它们只能在驱动程序中创建。您看到的错误是火花试图序列化火花上下文本身,因为它是每个块的计算所需要的。更多关于序列化的信息可以在这个问题的第一个答案中找到。

“…虽然我并不真的需要SparkContext在本地构建每个RDD”-实际上这正是调用sc.parallelize时要做的。底线——您需要找到(或编写)L-BFGS的本地实现。

 类似资料:
  • 在Kotlin的类中,一个对象和一个同伴对象有什么区别? 它的“静态”(我是java方面的)生命周期可能有区别吗?

  • 我正在使用Apache Flink对流数据执行分析。 我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。 如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。 目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerat

  • 我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化

  • 我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示: 将数据写入特定Kafka主题的方法如下: 我有另一种方法可以从Kafka主题获取对象的字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误: 主要代码: Java似乎试图序列化对象,而不仅仅是字段

  • 问题内容: 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机的符号和字母。首先,我想知道其背后的原因是什么,然后如何解决此问题。 好了,这是我的代码:我要序列化的对象 序列化 } 反序列化 输出: 问题答案: 存储在文件中的序列化对象不可读 除了通过反序列化之外,它们并不可读。 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机

  • 我使用的是 DJango 1.8 和蟒蛇 3.4 当运行下面的视图时,Django抛出类型错误-对象不可JSON序列化 Views.py 我试图从mysql数据库中读取几行数据并显示在html文件中,当上面的视图运行时,我看到下面的错误信息 在HTML页面中,我有一个下拉列表。根据选择的选项,我的Ajax将返回从mysql表获取的记录。 Models.py GreenStats和YellowSta