当前位置: 首页 > 知识库问答 >
问题:

在Spark Scala中使用自定义数据帧类时任务不可序列化

华旭
2023-03-14

我在Scala/Spark(1.5)和齐柏林飞艇上遇到了一个奇怪的问题:

如果我运行以下Scala/Spark代码,它将正常运行:

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")

rdd.map{a => 
    val aa = testList(0)
    None}

但是,在声明了此处建议的自定义数据帧类型之后

//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame

object ExtraDataFrameOperations {
  implicit class DFWithExtraOperations(df : DataFrame) {

    //drop several columns
    def drop(colToDrop:Seq[String]):DataFrame = {
        var df_temp = df
        colToDrop.foreach{ case (f: String) =>
            df_temp = df_temp.drop(f)//can be improved with Spark 2.0
        }
        df_temp
    }   
  }
}

使用它的例子如下:

//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"

val delimiter =  ","

val colToIgnore = Seq("c_9", "c_10")

val inputICFfolder = "hdfs:///group/project/TestSpark/"

val df = sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", "true") // Use first line of all files as header
            .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
            .option("delimiter", delimiter)
            .option("charset", "UTF-8")
            .load(inputICFfolder + filename)
            .drop(colToIgnore)//call the customize dataframe

这次运行成功。

现在如果我再次运行下面的代码(同上)

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a => 
    val aa = testList(0)
    None}

我收到了错误信息:

rdd:org。阿帕奇。火花rdd。RDD[Int]=ParallelCollectionRDD[8]位于parallelize at:32 testList:List[String]=List(a,b)org。阿帕奇。火花SparkException:任务在组织中不可序列化。阿帕奇。火花util。ClosureCleaner美元。EnsureCleaner.scala:304)可在org。阿帕奇。火花util。ClosureCleaner美元。org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)位于org。阿帕奇。火花util。ClosureCleaner美元。clean(ClosureCleaner.scala:122)在org上。阿帕奇。火花SparkContext。clean(SparkContext.scala:2032)位于org。阿帕奇。火花rdd。RDD$$anonfun$map$1。应用(RDD.scala:314)。。。原因:java。伊奥。NotSerializableException:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ExtraDataFrameOperations$序列化堆栈:-对象不可序列化(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ExtraDataFrameOperations$,值:$iwC$$iwC$$iwC$$ExtraframeOperations$$6E70E)-字段(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,名称:ExtraDataFrameOperations$模块,类型:类$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ExtraDataFrameOperations$)-object(类$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$iwC@4c6d0802)-字段(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,名称:$iw,类型:类$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)。。。

我不明白:

  • 为什么在没有对数据帧执行任何操作时发生此错误
  • 为什么“ExtraDataFrameOperations”在以前成功使用时不可序列化

更新:

试图与

@inline val testList = List[String]("a", "b")

没有帮助。

共有2个答案

从焱
2023-03-14

看起来spark试图序列化testList周围的所有范围。尝试内联数据@inline val testList=List[String](“a”、“b”),或者使用不同的对象存储传递给驱动程序的函数/数据

伍溪叠
2023-03-14

只需为我添加“扩展可序列化”这项工作

/**
   * A wrapper around ProducerRecord RDD that allows to save RDD to Kafka.
   *
   * KafkaProducer is shared within all threads in one executor.
   * Error handling strategy - remember "last" seen exception and rethrow it to allow task fail.
   */
 implicit class DatasetKafkaSink(ds: Dataset[ProducerRecord[String, GenericRecord]]) extends Serializable {

   class ExceptionRegisteringCallback extends Callback {
     private[this] val lastRegisteredException = new AtomicReference[Option[Exception]](None)

     override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
       Option(exception) match {
         case a @ Some(_) => lastRegisteredException.set(a) // (re)-register exception if send failed
         case _ => // do nothing if encountered successful send
       }
     }

     def rethrowException(): Unit = lastRegisteredException.getAndSet(None).foreach(e => throw e)
   }

   /**
     * Save to Kafka reusing KafkaProducer from singleton holder.
     * Returns back control only once all records were actually sent to Kafka, in case of error rethrows "last" seen
     * exception in the same thread to allow Spark task to fail
     */
   def saveToKafka(kafkaProducerConfigs: Map[String, AnyRef]): Unit = {
     ds.foreachPartition { records =>
       val callback = new ExceptionRegisteringCallback
       val producer = KafkaProducerHolder.getInstance(kafkaProducerConfigs)

       records.foreach(record => producer.send(record, callback))

       producer.flush()
       callback.rethrowException()
     }
   }
 }'
 类似资料:
  • 当我创建如上图所示的UDF函数时,我得到任务序列化错误。只有在使用在集群部署模式下运行代码时,才会出现此错误。然而,它在Spark-Shell中运行良好。 我尝试添加,但没有解决问题。

  • 本文向大家介绍python自定义时钟类、定时任务类,包括了python自定义时钟类、定时任务类的使用技巧和注意事项,需要的朋友参考一下 这是我使用python写的第一个类(也算是学习面向对象语言以来正式写的第一个解耦的类),记录下改进的过程。 分析需求 最初,因为使用time模块显示日期时,每次都要设置时间字符串的格式,挺麻烦,但还是忍了。 后来,在处理多线程任务时需要实现定时控制的功能,更麻烦,

  • 我正在尝试运行一个Java类作为gradle任务。 我已将此添加到我的: 但是,当我在命令行上运行时,它会失败并出现以下错误: 因此,我在我的任务中添加了一个类路径,如问题中所述: 然而,这是一个大型的遗留项目,具有非常长的类路径,所以当我运行< code > gradle download keystore 时,我得到了另一个错误: 所以我在中修改了我的,现在看起来像这样: 这适用于命令行,但是

  • 我必须附加这个由“stroint”方法生成的列,它被证明是不可序列化的。 我已经尝试过这样把函数放在辅助类中, 但这没用。

  • 在这个岗位上,, 获取在Laravel 3/4中执行的查询 Ricardo Rossi提供了一个关于使用Kint和自定义类轻松输出有关使用查询生成器创建的Laravel查询的信息的极好答案。 我能够使用composer设置Kent,但我是Laravel的新手,从版本4开始就没有使用过PHP。 有人能提供一个例子来描述如何创建一个可以从任何地方调用的类吗?在他的例子中,里卡多说他使用“DBH::q(