当前位置: 首页 > 知识库问答 >
问题:

在Spark上使用json4s的NotSerializableException

梁学真
2023-03-14

基本上,我必须使用Spark分析HDFS上的一些复杂JSON。

我使用“for理解”来(预)过滤json4s的JSON和“提取”方法,将其包装成一个case类

这个很好用!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

到目前为止还不错!

当我尝试将(预)过滤的JSON提取到我的CaseClass时,我得到了以下结果:

线程“main”组织中出现异常。阿帕奇。火花SparkException:作业因阶段失败而中止:任务不可序列化:java。伊奥。NotSerializableException:org。json4s。默认格式$

下面是带抽取的代码:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

我已经在scala ws上尝试了我的代码,并且已经成功了!我对hdfs和spark真的很新,所以我希望能得到一个提示。

共有3个答案

颛孙霖
2023-03-14

解决我的问题的是,我在rdd中使用了implicit val formats=DefaultFormats。foreach{}循环。它解决了我的可序列化异常。

以下是我解决问题的代码片段:

case class rfId(rfId: String) {}

// ... some code here ...

 rdd.foreach { record =>
    val value = record.value()

    // Bring in default date formats etc and makes json4s serializable
    implicit val formats = DefaultFormats
    val json = parse(value)
    println(json.camelizeKeys.extract[rfId])  // Prints `rfId(ABC12345678)`
 }
柏正平
2023-03-14

这实际上已经被修复;JSON4S从3.3.0版开始可序列化:https://github.com/json4s/json4s/issues/137

双元魁
2023-03-14

Spark将RDD转换上的闭包序列化,并将其“发送”给工作人员以进行分布式执行。这就要求闭包(通常也包括包含对象)中的所有代码都应该是可序列化的。

看组织的实施。json4s。DefaultFormat$(该特征的伴随对象):

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}

很明显,这个对象是不可序列化的,不能这样做。(ThreadLocal本身就不可序列化)

你似乎没有在你的代码中使用Date类型,所以你能摆脱implicit val formats=DefaultFormats或者用可序列化的东西替换DefaultFormats吗?

 类似资料:
  • 我们在下面的框架中编写了一些Spark/Scala单元测试用例:-https://github.com/holdenk/spark-testing-base 对于我们的一些特性,我们将“JSON4S-Jackson”升级到“3.5.3”--在将jackson版本升级到最新版本后,我们所有的单元测试用例都失败了,错误如下:

  • 连接到Spark:pyspark_driver_python=/usr/local/bin/jupyter pyspark_driver_python_opts=“notebook--no-browser--port=7777”pyspark--packages com.databricks:spark-csv2.10:1.1.0--master spark://127.0.0.1:7077--e

  • 我试图在GCP上下文中测试Spark-HBase连接器,并尝试遵循1,它要求使用Maven(我尝试了Maven 3.6.3)对Spark 2.4进行本地封装连接器[2],在上提交作业时(完成后[3])得到以下错误。 知道吗? 谢谢你的支持 [2]https://github.com/hortonworks-spark/shc/tree/branch-2.4 [3]Spark-HBASE-GCP模板

  • 问题内容: 与此处类似的问题,但在此处没有足够的评论要点。 根据最新的Spark 文档,可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何与sql 一起使用的示例,但还没有找到有关如何直接在DataFrame上使用a的任何示例。 op所提供的解决方案,在上面链接的问题上使用,根据Spark Java API文档,该解决方案将在Spark 2.0中删除。在那里,它

  • 我有一个从配置文件读取数据的Spark作业。此文件是一个类型安全配置文件。 我要使用的外部application.conf的内容如下所示: 此application.conf文件存在于我的本地计算机文件系统中(而不是HDFS上) 我用的是Spark1.6.1和纱线 提前致谢

  • 我正在努力让junit在我的Spark shell中工作。 当试图从junit导入Assert时,出现以下错误消息: 有办法解决这个问题吗?知道如何从scala shell下载org.junit吗? 编辑:按照zsxwing的建议,我使用了spark-shell-packages JUnit:JUnit:4.12,输出如下: 但是,在尝试导入org.junit.Assert._时仍然面临同样的问题