当前位置: 首页 > 知识库问答 >
问题:

sparkException:任务不可序列化(由org.apache.hadoop.conf.configuration引起)

施恩
2023-03-14

我想将转换流写入Elasticsearch索引,如下所示:

transformed.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    val messages = rdd.map(prepare)
    messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
  }
})

val messages=rdd.map(prepare)行抛出错误(见下文)。我尝试了不同的方法来解决这个问题(例如,在val conf旁边添加@transient),但似乎没有任何效果。

它是否与Hadoop的配置有关?(我参考了以下消息:类:org.apache.hadoop.conf.configuration,value:configuration:core-default.xml,core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,yarn-site.xml)

更新:

class EsStream(name:String,conf:HConf) extends SparkBase with Serializable {

  /* Elasticsearch configuration */ 
  val ec = getEsConf(conf)               

  /* Kafka configuration */
  val (kc,topics) = getKafkaConf(conf)

  def run() {

    val ssc = createSSCLocal(name,conf)

    /*
     * The KafkaInputDStream returns a Tuple where only the second component
     * holds the respective message; we therefore reduce to a DStream[String]
     */
    val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
    /*
     * Inline transformation of the incoming stream by any function that maps 
     * a DStream[String] onto a DStream[String]
     */
    val transformed = transform(stream)
    /*
     * Write transformed stream to Elasticsearch index
     */
    transformed.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        val messages = rdd.map(prepare)
        messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
      }
    })

    ssc.start()
    ssc.awaitTermination()    

  }

  def transform(stream:DStream[String]) = stream

  private def getEsConf(config:HConf):HConf = {

    val _conf = new HConf()

    _conf.set("es.nodes", conf.get("es.nodes"))
    _conf.set("es.port", conf.get("es.port"))

    _conf.set("es.resource", conf.get("es.resource"))

    _conf

  }

  private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = {

    val cfg = Map(
      "group.id" -> conf.get("kafka.group"),

      "zookeeper.connect" -> conf.get("kafka.zklist"),
      "zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")

    )

    val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap   

    (cfg,topics)

  }

  private def prepare(message:String):(Object,Object) = {

    val m = JSON.parseFull(message) match {
      case Some(map) => map.asInstanceOf[Map[String,String]]
      case None => Map.empty[String,String]
    }

    val kw = NullWritable.get

    val vw = new MapWritable
    for ((k, v) <- m) vw.put(new Text(k), new Text(v))

    (kw, vw)

  }

}

共有1个答案

越勇锐
2023-03-14

EsStream的类构造函数中删除conf:hconf并像类EsStream(name:string)那样编写它。

接下来,创建一个签名为public def init(conf:hconf):map(String,String)的方法

在此方法中,您将读取所需的配置并更新ec(kc,主题)

在此之后,您应该调用您的run方法。

 类似资料:
  • 我已经上了三节课 任务未序列化

  • 我在我大学的热图项目中,我们必须从txt文件(坐标、高度)中获取一些数据(212Go),然后将其放入HBase以在带有Express的Web客户端上检索它。 我练习使用144Mo文件,这是工作: 但是我现在使用212Go文件,我有一些内存错误,我猜收集方法会收集内存中的所有数据,所以212Go太多了。 所以现在我在尝试这个: 我得到了“org.apache.spark.SparkException

  • 将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Sp

  • 我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中

  • null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace: