当前位置: 首页 > 知识库问答 >
问题:

火花/scala字符串json内地图

李俊雅
2023-03-14

我有一对看起来像

(1, {"id":1, "picture": "url1"})
(2, {"id":2, "picture": "url2"})
(3, {"id":3, "picture": "url3"})
...

第二个元素是一个字符串,我从函数get()从http://alvinalexander.com/scala/how-to-write-scala-http-get-request-client-source-fromurl.这里是函数:

@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def get(url: String,
        connectTimeout: Int = 5000,
        readTimeout: Int = 5000,
        requestMethod: String = "GET") =
{
    import java.net.{URL, HttpURLConnection}
    val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]
    connection.setConnectTimeout(connectTimeout)
    connection.setReadTimeout(readTimeout)
    connection.setRequestMethod(requestMethod)
    val inputStream = connection.getInputStream
    val content = io.Source.fromInputStream(inputStream).mkString
    if (inputStream != null) inputStream.close
    content
}

现在我想把这个字符串转换成json,从中获取图片url。(来自此)https://stackoverflow.com/a/38271732/1456026)

val step2 = pairRDD_1.map({case(x,y)=>{
val jsonStr = y
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
(x,y("picture"))
}})

但我不断得到

线程“main”组织中出现异常。阿帕奇。火花SparkException:任务不可序列化

当我打印出前20个元素并试图在外部逐个手动将字符串转换为json时。地图它起作用了。

val rdd = sc.parallelize(Seq("""{"id":1, "picture": "url1"}"""))
val df = sqlContext.read.json(rdd)
println(df)
>>>[id: string, picture: string]

如何转换字符串json在火花/scala里面. map?

共有3个答案

易自珍
2023-03-14

答案之一是使用json4s库。来源:http://muster.json4s.org/docs/jawn_codec.html

//case class defined outside main()
case class Pictures(id: String, picture: String)

// import library
import muster._
import muster.codec.jawn._

// here all the magic happens 
val json_read_RDD = pairRDD_1.map({case(x,y) =>
      {
          val json_read_to_case_class = JawnCodec.as[Pictures](y)
          (x, json_read_to_case_class.picture)
    }})

// add to build.sbt
libraryDependencies ++= Seq(
"org.json4s" %% "muster-codec-json" % "0.3.0",
"org.json4s" %% "muster-codec-jawn" % "0.3.0")

归功于特拉维斯·海格纳,他解释了为什么原始代码不起作用,并向安东·奥科尼奇寻求使用json库的建议。

常雪风
2023-03-14

通常,当您看到这条消息时,这是因为您正在map函数(read anonymous函数)中使用一个在其外部定义的资源,并且无法序列化。

集群模式下运行时,匿名函数将在另一台机器上运行。在另一台机器上,应用程序的一个新实例被实例化,其状态(变量/值等)由驱动程序序列化并发送到新实例的数据设置。如果匿名函数是一个闭包(即使用其作用域之外的变量),那么这些资源必须是可序列化的,才能发送到工作节点。

例如,map函数可能会尝试使用数据库连接来获取RDD中每个记录的一些信息。该数据库连接仅在创建它的主机上有效(当然,从网络的角度来看),该主机通常是驱动程序,因此无法从其他主机序列化、发送和使用它。在这个特定的示例中,您将执行mapPartitions()来实例化来自worker本身的数据库连接,然后执行map()该分区内的每个记录来查询数据库。

如果没有完整的代码示例,我无法提供更多帮助,以了解哪些潜在值或变量无法序列化。

翟凯
2023-03-14

您不能在分布式操作中使用SparkContext。在上面的代码中,您不能在pairRDD_1上的map操作中访问SparkContext。

考虑使用JSON库执行转换。

 类似资料:
  • 如何使用Spark-Scala连接日期和时间列(两个字符串)

  • 我想将包含字符串记录的RDD转换为Spark数据帧,如下所示。 模式行不在同一个中,而是在另一个变量中: 所以现在我的问题是,我如何使用上面两个,在Spark中创建一个数据帧?我使用的是Spark 2.2版。 我确实搜索并看到了一篇帖子:我可以使用spack-csv将表示为字符串的CSV读取到Apache Spark中吗?然而,这并不是我所需要的,我也无法找到一种方法来修改这段代码以在我的情况下工

  • 嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala, 我为这个scala创建了一个单独的文件夹, calac-cp“lib/salt.jar:lib/spark.jar”main.scala 这已成功运行并在文件夹BinexTest下生成类。 现在,项

  • 主要内容:实例,创建字符串,实例,实例,字符串长度,实例,字符串连接,实例,创建格式化字符串,实例,String 方法以下实例将字符串赋值给一个常量: 实例 object Test {     val greeting : String = "Hello,World!"     def main (args : Array [String ] ) {       println ( greeting )     } } 以上实例定义了变量 greeting,为字符串常量,它的类型为 String

  • 以下实例将字符串赋值给一个常量: object Test { val greeting: String = "Hello,World!" def main(args: Array[String]) { println( greeting ) } } 以上实例定义了变量 greeting,为字符串常量,它的类型为 String (java.lang.String)

  • 我想从Spark v.1.6(使用scala)数据帧创建一个JSON。我知道有一个简单的解决方案,就是做。 但是,我的问题看起来有点不同。例如,考虑具有以下列的数据帧: 我想在最后有一个数据帧 其中C是包含、、的JSON。不幸的是,我在编译时不知道数据框是什么样子的(除了始终“固定”的列和)。 至于我需要这个的原因:我使用Protobuf发送结果。不幸的是,我的数据帧有时有比预期更多的列,我仍然会