基本上,我必须使用Spark分析HDFS上的一些复杂JSON。
我使用“for理解”来(预)过滤json4s的JSON和“提取”方法,将其包装成一个case类
这个很好用!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
到目前为止还不错!
当我尝试将(预)过滤的JSON提取到我的CaseClass时,我得到了以下结果:
线程“main”组织中出现异常。阿帕奇。火花SparkException:作业因阶段失败而中止:任务不可序列化:java。伊奥。NotSerializableException:org。json4s。默认格式$
下面是带抽取的代码:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
我已经在scala ws上尝试了我的代码,并且已经成功了!我对hdfs和spark真的很新,所以我希望能得到一个提示。
解决我的问题的是,我在rdd中使用了
循环。它解决了我的可序列化异常。implicit val formats=DefaultFormats
。foreach{}
以下是我解决问题的代码片段:
case class rfId(rfId: String) {}
// ... some code here ...
rdd.foreach { record =>
val value = record.value()
// Bring in default date formats etc and makes json4s serializable
implicit val formats = DefaultFormats
val json = parse(value)
println(json.camelizeKeys.extract[rfId]) // Prints `rfId(ABC12345678)`
}
这实际上已经被修复;JSON4S从3.3.0版开始可序列化:https://github.com/json4s/json4s/issues/137
Spark将RDD转换上的闭包序列化,并将其“发送”给工作人员以进行分布式执行。这就要求闭包(通常也包括包含对象)中的所有代码都应该是可序列化的。
看组织的实施。json4s。DefaultFormat$(该特征的伴随对象):
object DefaultFormats extends DefaultFormats {
val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
val UTC = TimeZone.getTimeZone("UTC")
}
很明显,这个对象是不可序列化的,不能这样做。(ThreadLocal本身就不可序列化)
你似乎没有在你的代码中使用Date
类型,所以你能摆脱implicit val formats=DefaultFormats
或者用可序列化的东西替换DefaultFormats吗?
我们在下面的框架中编写了一些Spark/Scala单元测试用例:-https://github.com/holdenk/spark-testing-base 对于我们的一些特性,我们将“JSON4S-Jackson”升级到“3.5.3”--在将jackson版本升级到最新版本后,我们所有的单元测试用例都失败了,错误如下:
连接到Spark:pyspark_driver_python=/usr/local/bin/jupyter pyspark_driver_python_opts=“notebook--no-browser--port=7777”pyspark--packages com.databricks:spark-csv2.10:1.1.0--master spark://127.0.0.1:7077--e
我试图在GCP上下文中测试Spark-HBase连接器,并尝试遵循1,它要求使用Maven(我尝试了Maven 3.6.3)对Spark 2.4进行本地封装连接器[2],在上提交作业时(完成后[3])得到以下错误。 知道吗? 谢谢你的支持 [2]https://github.com/hortonworks-spark/shc/tree/branch-2.4 [3]Spark-HBASE-GCP模板
问题内容: 与此处类似的问题,但在此处没有足够的评论要点。 根据最新的Spark 文档,可以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我找到了多个如何与sql 一起使用的示例,但还没有找到有关如何直接在DataFrame上使用a的任何示例。 op所提供的解决方案,在上面链接的问题上使用,根据Spark Java API文档,该解决方案将在Spark 2.0中删除。在那里,它
我有一个从配置文件读取数据的Spark作业。此文件是一个类型安全配置文件。 我要使用的外部application.conf的内容如下所示: 此application.conf文件存在于我的本地计算机文件系统中(而不是HDFS上) 我用的是Spark1.6.1和纱线 提前致谢
我正在努力让junit在我的Spark shell中工作。 当试图从junit导入Assert时,出现以下错误消息: 有办法解决这个问题吗?知道如何从scala shell下载org.junit吗? 编辑:按照zsxwing的建议,我使用了spark-shell-packages JUnit:JUnit:4.12,输出如下: 但是,在尝试导入org.junit.Assert._时仍然面临同样的问题