当前位置: 首页 > 知识库问答 >
问题:

Scala,Play2.4,Spark:由于SparkConf对象,任务不可串行化

孔波
2023-03-14

我在Scala和Play2中使用Spark。4.首先,我看到了这个线程:Task not serializable:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException

如果我们将一个方法传递给Spark,它会尝试序列化整个类,我同意这个事实。

我的代码,会更清晰:

class GPSSparkServiceImpl @Inject() (val stepService: StepDbService, val coordinateService: CoordinateService) extends GPSSparkService with Serializable{

  /**
   * Spark config
   * Set Master node and AppName
   */
  val conf = new SparkConf().setAppName("Editus GPS").setMaster("local[2]")

  /**
   * Initialize Spark Context
   */
  val sc = new SparkContext(conf)

  override def execute() = {
    val logData = sc.textFile("file://C://work/csv/gps.csv").cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    println("Lines with a: %s".format(numAs))
  }

  override def generateUserToStep(): Unit = {
    val futureSteps = stepService.findAll()
    futureSteps onSuccess{
      case steps =>
        val data = sc.textFile("file://C://work/csv/gps.csv").cache()
        val result = data.flatMap(line => steps.map(step => (line, step))).filter { tuple =>
          coordinateService.checkProximity(
            coordinateService.coordinateToDistanceInMeters(
              tuple._1.split(";")(1).toDouble, tuple._1.split(";")(2).toDouble, tuple._2.gpsCoordinate.latitude, tuple._2.gpsCoordinate.longitude
            ), tuple._2
          )
        }.count()
        println("result: " + result + " for " + steps.length + " steps")
    }
  }
}    

正如你所看到的,我在火花中使用了2个方法:Check Proximity协调服务中的坐标-距离-仪表,它们将被注入为协调服务Impl,并且这个类是可序列化的。类中的所有对象都是可序列化的。

协调服务Impl:

class CoordinateServiceImpl @Inject() (val config: Configuration) extends CoordinateService with Serializable{

  override def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
    val earthRadius: Double = 6371000
    val dLat: Double = Math.toRadians(lat2-lat1)
    val dLng: Double = Math.toRadians(lng2-lng1)
    val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
      Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
        Math.sin(dLng/2) * Math.sin(dLng/2)
    val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
    earthRadius * c
  }

  override def checkProximity(distance: Double, step: Step): Boolean =
    distance < step.acceptableProximity
}

为什么我面对的SparkConf是不可序列化的,这是在实际的类中?我甚至没有使用它的任何方法。也许我错过了什么......

stacktrace:

org.apache.spark.SparkException: Task not serializable                                                                
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)                         
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)                                      
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)                                               
        at org.apache.spark.rdd.RDD.filter(RDD.scala:303)                                                             
        at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1.applyOrElse(GPSSparkServiceImpl.scala:
41)                                                                                                                   
        at services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1.applyOrElse(GPSSparkServiceImpl.scala:
38)                                                                                                                   
        at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)                                       
        at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)                                       
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)                                               
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)        
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                       
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)                    
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)                           
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                   
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                          
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf                                               
Serialization stack:                                                                                                  
        - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1b0321ae)     
        - field (class: services.gps.spark.GPSSparkServiceImpl, name: conf, type: class org.apache.spark.SparkConf)   
        - object (class services.gps.spark.GPSSparkServiceImpl, services.gps.spark.GPSSparkServiceImpl@458c4049)      
        - field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1, name: $outer, type: clas
s services.gps.spark.GPSSparkServiceImpl)                                                                             
        - object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1, <function1>)            
        - field (class: services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1$$anonfun$3, name: $outer,
 type: class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1)                                    
        - object (class services.gps.spark.GPSSparkServiceImpl$$anonfun$generateUserToStep$1$$anonfun$3, <function1>) 
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)        
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)                   
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)                      
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)                         


 ... 14 more                                                                                                   

共有1个答案

戚飞雨
2023-03-14

如果我使用一个对象而不是一个注入的类,它会完美地工作。

object GpsUtils{
  def coordinateToDistanceInMeters(lat1: Double, lng1: Double, lat2: Double, lng2: Double): Double = {
    val earthRadius: Double = 6371000
    val dLat: Double = Math.toRadians(lat2-lat1)
    val dLng: Double = Math.toRadians(lng2-lng1)
    val a: Double = Math.sin(dLat/2) * Math.sin(dLat/2) +
      Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
        Math.sin(dLng/2) * Math.sin(dLng/2)
    val c: Double = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a))
    earthRadius * c
  }

  def checkProximity(distance: Double, step: Step): Boolean =
    distance < step.acceptableProximity
}
 类似资料:
  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 这给出的错误如下,任何帮助将是感激的:

  • 我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • 问题在于Spark数据集和INT列表的序列化。Scala版本是2.10.4,Spark版本是1.6。 这和其他问题类似,但是我不能基于这些回答让它工作。我已经简化了代码,以便仅仅显示问题。 我有一门案例课: 我的主要方法是: 我得到以下错误: 如果我从FlightExt中删除列表,那么一切正常,这表明lambda函数序列化没有问题。 Scala本身似乎序列化了一系列Int的优点。也许Spark在序

  • 问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是