val clientsRDD = resultRDD.map(ClientRow.parseClientRow)
// change RDD of ClientRow objects to a DataFrame
val clientsDF = clientsRDD.toDF()
// Return the schema of this DataFrame
clientsDF.printSchema()
// print each line DataFrame
clientsDF.collect().foreach(println)
root
|-- rowkey: string (nullable = true)
|-- campaigns: string (nullable = true)
[1,[{"1000":"campaign1"},{"1001":"campaign2"}]]
[2,[{"1002":"campaign3"}]]
rowkey type body
client_id-campaign_id, record_type, record_text
client1
records:100, login:20, actions:80
client1 campaign1
records:70, login:16, actions:50
client1 campaign2
records:30, login:4, actions:30
最后我想写统计数据。
用Scala在Spark中实现这一点的最佳方法是什么?我是否必须迭代clientsRDD(map?),并为每一行生成不同的RDDs映射记录?
首先,您需要定义schema for campaign字段:这意味着您使用
val schema = StructType(Seq(StructField("rowkey", StringType, true),
StructField("campaigns", StructType(
StructField("id", StringType, true) ::
StructField("name", StringType, true) :: Nil
))
))
然后,可以在活动字段中使用explode
方法使行平坦。
val df = sqlContext.createDataFrame(clientsRDD, schema)
df.select(col("rowkey"), explode(col("campaigns")).as("campaign")).filter(col("campaign.id") === 1)
null 一些示例输出数据: *编辑:工作的scala代码行:
Scala 函数 我们可以在 Scala 函数内定义函数,定义在函数内的函数称之为局部函数。 以下实例我们实现阶乘运算,并使用内嵌函数: object Test { def main(args: Array[String]) { println( factorial(0) ) println( factorial(1) ) println( factor
作为数独生成器的一部分,我有一个用于过滤嵌套列表的函数,以便只返回某些索引的内部列表以及这些列表中某些索引的内部列表元素。其思想是返回一个List[Int],其中包含一个3x3正方形的值,该正方形取自一个9x9数独拼图,表示为List[List[Int]],作为函数的参数提供。 我尝试了两种方法,但都未能始终如一地发挥作用。一种方法尝试从列表中筛选出某些子列表,然后从其余列表中筛选出项目。此函数完
我是java8的新手。 我有以下课程。 我能够保存这个在mongo收集: 现在,我得出了基于spring mongo数据存储库的结果。我想迭代上面的mongo集合,这样我只能得到我尝试过的以下车辆列表: 请帮助我了解java8。提前谢谢
假设我有一个包含集合的对象,所述集合上的每个元素都包含一个集合,每个集合都包含一个集合。 我想在最深的对象上迭代,并对其应用相同的代码。 命令式的方法是微不足道的,但有没有一种方法来完成这一切? 我可以看到如何从最深的循环中生成lambda: 但我能做得更多吗?
生成器迭代 手动迭代生成器,递归执行 AsyncTask::next,调用Generator::send方法将将yield值作为yield表达式结果。 yield表达式可能是一个异步调用,我们这里为之后把异步调用的结果作为yield表达式结果铺垫。 yield外侧括号在PHP5必须,PHP7不需要。 如, $ip = (yield async_dns_lookup(...) ); ^