使用名为< code>lastTail的数据帧,我可以这样迭代:
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
这将输出“类似”的内容(带编校):root|--fileGid:string(nullable=true)|--eventStruct:struct(nullaable=false)||--event Index:integer(nullable=true)||--eventGid:string(nullable=true)| |--eventType:string||--eventType:string(nullable=true)
和(只有一个迭代项-经过了编辑,但希望有足够好的语法)
**** 类组织 apache.spark.sql.catalyst.expressions.GenericRow与Schema 12345 模式: 结构类型(结构字段(文件gid,字符串类型,true), 结构字段(事件结构,结构类型(事件索引,整数类型,true), 结构字段(事件Gid,字符串类型,true), 结构字段(事件类型,字符串类型,true)), 结构字段(修订结构,结构类型(结构字段(事件索引,整数类型,true),结构字段(事件类型,字符串类型,true),结构字段(事件类型,字符串类型,true), 结构字段(事件类型,字符串类型,true),字符串: [12345,[1,4,编辑],[1,4,修订版]
注意:我在 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala 上做像价值指标= iter.sum
这样的部分,而是使用数据帧。我还遵循 http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning 中看到的“使用前切RDD的设计模式”。
如何将这个org . Apache . spark . SQL . catalyst . expressions . genericrowwithschema(见https://github . com/Apache/spark/blob/master/SQL/catalyst/src/main/Scala/org/Apache/spark/SQL/catalyst/expressions/rows . Scala)迭代项转换成一个容易编写的东西(JSON或...?-我是开放的)进入PostgreSQL。(如果不是JSON,请建议如何将这个值读回到DataFrame中,以便在其他地方使用。)
好吧,我想出了一种不同的方法来解决这个问题。
val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString)
val ltv = lastTail.toJSON
val kvPair = ltk.zip(ltv)
然后,我会简单地迭代RDD,而不是数据帧。
kvPair.foreachPartition(iter => {
while(iter.hasNext) {
val item = iter.next()
println(item.getClass)
println(item)
}
})
撇开数据不谈,我得到了< code >类scala。Tuple2,这使得在JDBC / PostgreSQL中存储KV对变得更加容易。
我敢肯定,还有其他不可行的方法。
问题内容: 是否有命令行工具可用于将.plist文件转换为JSON? 如果没有,在Mac上使用Objective-C或C创建一个的方法是什么?例如,有用于Objective- C的JSONKit。如何打开.plist文件,将其传递给JSONKit,然后将其序列化为JSON? 问题答案: 如果您使用的是Mac,则可以在命令行上使用plutil工具(我相信这是开发人员工具附带的工具): 如评论中所述,
问题内容: 我认为以下内容将是一项非常常见的任务,并认为将有一个简单的解决方案,但我找不到。 如果我有以下结构的数据表。 我想将其序列化为JSON对象,其中ID列是JSON对象中的一个节点,例如: 我调查了JSON.NET,但无法正常工作。编辑:我正在使用C# 问题答案: 使用JSON.NET,这非常简单。只需将您的数据表转换成等效的字典词典: 然后致电: 这是完整的测试: 结果:
输入 json : 预期输出: 我想有一个颠簸转换,它可以嵌套很少的田地。
我正在尝试使用JOLT(使用NiFi JoltTransformJson处理器)将JSON转换为不同的格式。对于单个JSON记录,正在使用的JOLT在JOLT应用程序演示中运行良好,而如果我使用多个JSON记录执行,那么我在JOLT应用程序演示中没有得到预期的输出。有人能告诉我在JOLT规范中需要做哪些额外的更改来处理多个JSON记录吗? 示例输入json JOLT使用: 预期输出JSON:
我试图将JSON字符串(必须是列表)转换为对象列表: 这是我的服务。class: 我得到了: InvalidDefinitionException:无法构造的实例(不存在像默认构造那样的创建者): 更新: 但仍然是个例外: InvalidDefinitionException:无法构造的实例(不存在像默认构造那样的创建者):
我在MongoDB中使用Java驱动程序3.0,以便通过Web服务发送JSON。 当我想将文档对象(org.bson.文档)转换为JSON时,我使用,当我想将JSON转换为文档对象时,我使用。 但是,当我处理文档列表时(如JSON中所示: