我目前正在尝试扩展一个使用Scala和Spark的机器学习应用程序。我正在使用我在Github上找到的Dieterich Lawson之前项目的结构
https://github.com/dieterichlawson/admm
该项目基本上使用SparkContext来构建训练样本块的RDD,然后对每个样本集执行局部计算(例如求解线性系统)。
我遵循同样的方案,但为了进行局部计算,我需要对每个训练样本块执行L-BFGS算法。为了做到这一点,我想使用mlLib中的L-BFGS算法,该算法具有以下特征。
runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient,
Updater updater, int numCorrections, double convergenceTol,
int maxNumIterations, double regParam, Vector initialWeights)
正如它所说,该方法将训练样本的RDD(对象,向量)作为输入。问题是,在每个worker上,我不再在本地保留数据的RDD结构。因此,我试图在矩阵的每个块上使用SparkContext的并行化函数。但当我这样做时,我会得到一个序列化程序异常。(确切的例外信息在问题的末尾)。
这是关于我如何处理SparkContext的详细说明。
首先,在主应用程序中,它用于打开一个文本文件,并在类LogRegressionXUpdate的工厂中使用:
val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)
在应用程序中,LogRegressionXUpdate类实现如下
class LogRegressionXUpdate(val training: RDD[(Double, NV)],
val rho: Double) extends Function1[BDV[Double],Double] with Prox with Serializable{
def prox(x: BDV[Double], rho: Double): BDV[Double] = {
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val (weights, loss) = LBFGS.runLBFGS(
training,
new GradientForLogRegADMM(rho,fromBreeze(x)),
new SimpleUpdater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
fromBreeze(x))
toBreeze(weights.toArray).toDenseVector
}
def apply(x: BDV[Double]): Double = {
Math.pow(1,2.0)
}
}
与以下伴生对象:
object LogRegressionXUpdate {
def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
val fns = new BlockMatrix(file, blockHeight).blocks.
map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
new RDF[LogRegressionXUpdate](fns, 0L)
}
}
这个构造函数导致了一个序列化错误,尽管我并不真的需要SparkContext在本地构建每个RDD。我一直在寻找这个问题的解决方案,添加@transient并没有解决它。然后,我的问题是:真的有可能构建这些“第二层RDD”吗?或者我被迫执行L-BFGS算法的非分布式版本。提前谢谢!
错误日志:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
RDD
s只能从驱动程序访问。每当你打电话给
myRDD.map(someObject.someMethod)
spark序列化计算someMethod
所需的任何内容,并将其发送给工作人员。在那里,该方法被反序列化,然后在每个分区上独立运行。
但是,您尝试使用本身使用火花的方法:您尝试创建一个新的RDD。但是,这是不可能的,因为它们只能在驱动程序中创建。您看到的错误是火花试图序列化火花上下文本身,因为它是每个块的计算所需要的。更多关于序列化的信息可以在这个问题的第一个答案中找到。
“…虽然我并不真的需要SparkContext在本地构建每个RDD”-实际上这正是调用sc.parallelize
时要做的。底线——您需要找到(或编写)L-BFGS的本地实现。
在Kotlin的类中,一个对象和一个同伴对象有什么区别? 它的“静态”(我是java方面的)生命周期可能有区别吗?
我正在使用Apache Flink对流数据执行分析。 我正在使用一个依赖项,其对象需要超过10秒才能创建,因为它在初始化之前读取hdfs中存在的几个文件。 如果我在open方法中初始化对象,我会得到一个超时异常,如果在接收器/平面图的构造函数中,我会得到序列化异常。 目前,我正在使用静态块来初始化其他类中的对象,使用前提条件。在主文件中选中NOTNULL(mgGenerator.mgGenerat
我有以下用于序列化查询集的代码: 下面是我的 我需要将其序列化。但它说无法序列化
我尝试为我的定制类实现一个方法,使用Flink-Kafka连接器生成关于Kafka的数据。类原型如下所示: 将数据写入特定Kafka主题的方法如下: 我有另一种方法可以从Kafka主题获取对象的字段中的数据,效果很好。现在尝试从Kafka主题获取数据并将其写入另一个Kafka主题时,我遇到了错误: 主要代码: Java似乎试图序列化对象,而不仅仅是字段
问题内容: 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机的符号和字母。首先,我想知道其背后的原因是什么,然后如何解决此问题。 好了,这是我的代码:我要序列化的对象 序列化 } 反序列化 输出: 问题答案: 存储在文件中的序列化对象不可读 除了通过反序列化之外,它们并不可读。 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机
我使用的是 DJango 1.8 和蟒蛇 3.4 当运行下面的视图时,Django抛出类型错误-对象不可JSON序列化 Views.py 我试图从mysql数据库中读取几行数据并显示在html文件中,当上面的视图运行时,我看到下面的错误信息 在HTML页面中,我有一个下拉列表。根据选择的选项,我的Ajax将返回从mysql表获取的记录。 Models.py GreenStats和YellowSta