我使用文件路径解析 Spark 数据帧,但现在我想将路径与时间一起作为单独的列添加到生成的数据帧中。下面是一个当前的解决方案(pathToDF 是一个帮助器方法):
val paths = pathsDF
.orderBy($"time")
.select($"path")
.as[String]
.collect()
if(paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(_.map(pathToDF).reduceLeft(_ union _))
} else {
Seq.empty[DataFrame]
}
我正在尝试做这样的事情,但我不确定如何使用Column添加时间列:
val orderedPaths = pathsDF
.orderBy($"time")
.select($"path")
//.select($"path", $"time") for both columns
val paths = orderedPaths
.as[String]
.collect()
if (paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
//.withColumn("time", orderedPaths("time") something like this
} else {
Seq.empty[DataFrame]
}
实现它的更好方法是什么?
输入自由度:
time Long
path String
当前结果:
resultDF schema
field1 Int
field2 String
....
fieldN String
预期结果:
resultDF schema
field1 Int
field2 String
....
path String
time Long
请检查下面的代码。
1.将<code>分组
2. 更改
// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
到
// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group.map(path => {
pathToDataDF(path).withColumn("path", lit(path))
}
)
})
.reduceLeft(_ union _)
例如,我同时使用了< code>par
注意
忽略一些方法,如pathToDataDF
,我尝试复制您的方法。
scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]
scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame
//Sample File content I have taken.
scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}
scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}
scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}
使用par
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path)
.withColumn("path",lit(path))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> parDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// With time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+
使用分组
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path)
.withColumn("path", lit(path))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> groupedDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// with time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+
假设我有一个空的dataframe,已经设置了列,但没有行。我从网上搜集了一些数据,所以假设我需要向空数据帧添加一个索引< code>'2176'。当我试图分配该行时,如何自动将它添加到数据库中?这是熊猫的目的还是我应该用别的东西?
我有一个pandas数据帧,如下所示: 我希望将第0行添加到数据帧的末尾,并获得如下所示的新数据帧: 我在熊猫身上能做什么来做到这一点?
问题内容: 我试图将Gradle(1.4)添加到具有多个测试套件的现有项目中。位于中的标准单元测试已成功运行,但是我无法设置任务以运行位于中的JUnit测试。 当我运行时,我在遇到类错误。这使我相信依赖关系设置不正确。如何设置以便它将运行我的JUnit集成测试? build.gradle 详细信息: Gradle 1.4 解决方案: 我尚未为集成测试源集设置编译类路径(请参见下文)。在我的代码中,
我使用的是Spark 1.6,我想在数据帧中添加一列。新列实际上是一个常量序列:Seq(“-0”、“-1”、“-2”、“-3”) 这是我的原始数据帧: root--user\u name:string(nullable=true) |--test\u name:string(nullable=true) |user_name|test_name| ------------ ------------
我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde