当前位置: 首页 > 知识库问答 >
问题:

向 Spark 数据帧添加其他列

鲁华茂
2023-03-14

我使用文件路径解析 Spark 数据帧,但现在我想将路径与时间一起作为单独的列添加到生成的数据帧中。下面是一个当前的解决方案(pathToDF 是一个帮助器方法):

val paths = pathsDF
  .orderBy($"time")
  .select($"path")
  .as[String]
  .collect()

if(paths.nonEmpty) {
  paths
    .grouped(groupsNum.getOrElse(paths.length))
    .map(_.map(pathToDF).reduceLeft(_ union _))
} else {
  Seq.empty[DataFrame]
}

我正在尝试做这样的事情,但我不确定如何使用Column添加时间列:

    val orderedPaths = pathsDF
      .orderBy($"time")
      .select($"path")
   //.select($"path", $"time") for both columns

    val paths = orderedPaths
      .as[String]
      .collect()

    if (paths.nonEmpty) {
      paths
        .grouped(groupsNum.getOrElse(paths.length))
        .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
          .withColumn("path", orderedPaths("path")))
    //.withColumn("time", orderedPaths("time") something like this
    } else {
      Seq.empty[DataFrame]
    }

实现它的更好方法是什么?

输入自由度:

time Long
path String

当前结果:

resultDF schema
field1 Int
field2 String
....
fieldN String

预期结果:

resultDF schema
field1 Int
field2 String
....
path   String
time   Long

共有1个答案

澹台欣怿
2023-03-14

请检查下面的代码。

1.将<code>分组

2. 更改

// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
     .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
     .withColumn("path", orderedPaths("path"))) 

// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
    group.map(path => {
        pathToDataDF(path).withColumn("path", lit(path)) 
        }
    )
})
.reduceLeft(_ union _)

例如,我同时使用了< code>par

注意忽略一些方法,如pathToDataDF,我尝试复制您的方法。

scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]

scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame

//Sample File content I have taken.

scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}

scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}

scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}

使用par

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val parDF = paths match {
        case p if !p.isEmpty => {
            p.par
            .map(path => { 
                pathToDataDF(path)
                .withColumn("path",lit(path))
            }).reduceLeft(_ union _)
        }
        case _ => spark.emptyDataFrame
    }
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> parDF.show(false)
+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// With time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val parDF = paths match {
            case p if !p.isEmpty => {
                p.par
                .map(path => {
                    pathToDataDF(path._1)
                    .withColumn("path",lit(path._1))
                    .withColumn("time",lit(path._2))
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

使用分组

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => { 
                        pathToDataDF(path)
                        .withColumn("path", lit(path))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> groupedDF.show(false)

+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// with time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => {
                        pathToDataDF(path._1)
                        .withColumn("path",lit(path._1))
                        .withColumn("time",lit(path._2))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }


groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

 类似资料:
  • 假设我有一个空的dataframe,已经设置了列,但没有行。我从网上搜集了一些数据,所以假设我需要向空数据帧添加一个索引< code>'2176'。当我试图分配该行时,如何自动将它添加到数据库中?这是熊猫的目的还是我应该用别的东西?

  • 我有一个pandas数据帧,如下所示: 我希望将第0行添加到数据帧的末尾,并获得如下所示的新数据帧: 我在熊猫身上能做什么来做到这一点?

  • 问题内容: 我试图将Gradle(1.4)添加到具有多个测试套件的现有项目中。位于中的标准单元测试已成功运行,但是我无法设置任务以运行位于中的JUnit测试。 当我运行时,我在遇到类错误。这使我相信依赖关系设置不正确。如何设置以便它将运行我的JUnit集成测试? build.gradle 详细信息: Gradle 1.4 解决方案: 我尚未为集成测试源集设置编译类路径(请参见下文)。在我的代码中,

  • 我使用的是Spark 1.6,我想在数据帧中添加一列。新列实际上是一个常量序列:Seq(“-0”、“-1”、“-2”、“-3”) 这是我的原始数据帧: root--user\u name:string(nullable=true) |--test\u name:string(nullable=true) |user_name|test_name| ------------ ------------

  • 我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde