当前位置: 首页 > 知识库问答 >
问题:

Spark Scala DataFrame单行转换为JSON用于PostrgeSQL插入

斜向文
2023-03-14

使用名为< code>lastTail的数据帧,我可以这样迭代:

import scalikejdbc._
// ... 
// Do Kafka Streaming to create DataFrame lastTail
// ...

lastTail.printSchema

lastTail.foreachPartition(iter => {

// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL) 

  while(iter.hasNext) {
    val item = iter.next()
    println("****")
    println(item.getClass)
    println(item.getAs("fileGid"))
    println("Schema: "+item.schema)
    println("String: "+item.toString())
    println("Seqnce: "+item.toSeq)

    // convert this item into an XXX format (like JSON)
    // write row to DB in the selected format
  }
})

这将输出“类似”的内容(带编校):root|--fileGid:string(nullable=true)|--eventStruct:struct(nullaable=false)||--event Index:integer(nullable=true)||--eventGid:string(nullable=true)| |--eventType:string||--eventType:string(nullable=true)

和(只有一个迭代项-经过了编辑,但希望有足够好的语法)

**** 类组织 apache.spark.sql.catalyst.expressions.GenericRow与Schema 12345 模式: 结构类型(结构字段(文件gid,字符串类型,true), 结构字段(事件结构,结构类型(事件索引,整数类型,true), 结构字段(事件Gid,字符串类型,true), 结构字段(事件类型,字符串类型,true)), 结构字段(修订结构,结构类型(结构字段(事件索引,整数类型,true),结构字段(事件类型,字符串类型,true),结构字段(事件类型,字符串类型,true), 结构字段(事件类型,字符串类型,true),字符串: [12345,[1,4,编辑],[1,4,修订版]

注意:我在 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala 上做像价值指标= iter.sum这样的部分,而是使用数据帧。我还遵循 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning 中看到的“使用前切RDD的设计模式”。

如何将这个org . Apache . spark . SQL . catalyst . expressions . genericrowwithschema(见https://github . com/Apache/spark/blob/master/SQL/catalyst/src/main/Scala/org/Apache/spark/SQL/catalyst/expressions/rows . Scala)迭代项转换成一个容易编写的东西(JSON或...?-我是开放的)进入PostgreSQL。(如果不是JSON,请建议如何将这个值读回到DataFrame中,以便在其他地方使用。)

共有1个答案

双子民
2023-03-14

好吧,我想出了一种不同的方法来解决这个问题。

val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
val ltv = lastTail.toJSON
val kvPair = ltk.zip(ltv)

然后,我会简单地迭代RDD,而不是数据帧。

kvPair.foreachPartition(iter => {
  while(iter.hasNext) {
    val item = iter.next()
    println(item.getClass)
    println(item)
  }
})

撇开数据不谈,我得到了< code >类scala。Tuple2,这使得在JDBC / PostgreSQL中存储KV对变得更加容易。

我敢肯定,还有其他不可行的方法。

 类似资料:
  • 问题内容: 是否有命令行工具可用于将.plist文件转换为JSON? 如果没有,在Mac上使用Objective-C或C创建一个的方法是什么?例如,有用于Objective- C的JSONKit。如何打开.plist文件,将其传递给JSONKit,然后将其序列化为JSON? 问题答案: 如果您使用的是Mac,则可以在命令行上使用plutil工具(我相信这是开发人员工具附带的工具): 如评论中所述,

  • 问题内容: 我认为以下内容将是一项非常常见的任务,并认为将有一个简单的解决方案,但我找不到。 如果我有以下结构的数据表。 我想将其序列化为JSON对象,其中ID列是JSON对象中的一个节点,例如: 我调查了JSON.NET,但无法正常工作。编辑:我正在使用C# 问题答案: 使用JSON.NET,这非常简单。只需将您的数据表转换成等效的字典词典: 然后致电: 这是完整的测试: 结果:

  • 输入 json : 预期输出: 我想有一个颠簸转换,它可以嵌套很少的田地。

  • 我正在尝试使用JOLT(使用NiFi JoltTransformJson处理器)将JSON转换为不同的格式。对于单个JSON记录,正在使用的JOLT在JOLT应用程序演示中运行良好,而如果我使用多个JSON记录执行,那么我在JOLT应用程序演示中没有得到预期的输出。有人能告诉我在JOLT规范中需要做哪些额外的更改来处理多个JSON记录吗? 示例输入json JOLT使用: 预期输出JSON:

  • 我试图将JSON字符串(必须是列表)转换为对象列表: 这是我的服务。class: 我得到了: InvalidDefinitionException:无法构造的实例(不存在像默认构造那样的创建者): 更新: 但仍然是个例外: InvalidDefinitionException:无法构造的实例(不存在像默认构造那样的创建者):

  • 我在MongoDB中使用Java驱动程序3.0,以便通过Web服务发送JSON。 当我想将文档对象(org.bson.文档)转换为JSON时,我使用,当我想将JSON转换为文档对象时,我使用。 但是,当我处理文档列表时(如JSON中所示: