当前位置: 首页 > 知识库问答 >
问题:

按某些列值拆分Spark数据帧,然后独立于其他列值旋转每个生成的数据帧

廉飞捷
2023-03-14

我试图根据一个(或多个)列的值拆分数据帧,并独立于其他列旋转每个生成的数据帧。即,给定一个输入数据帧:

val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
                  ("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
                  .toDF("name","age","company","address","country")

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

我需要根据“国家”列的不同值来拆分记录。对于输入数据帧,分割应产生:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//+-----+---+-------+--------+---------+

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

我还必须旋转每个数据帧下的“name”和“age”列,这样每个人都有不同的公司和地址,同时保持其余列的完整性。所需的输出数据帧如下所示:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|jimmy| 30|      a|street a|  germany|
//|  tom| 20|      b|street b|  germany|
//|  joe| 60|      c|street c|argentina|
//| lola| 40|      d|street d|argentina|
//|maria| 50|      e|street e|argentina|
//+-----+---+-------+--------+---------+

最后的行顺序无关紧要

我试图为每一行分配一个唯一的id,然后打乱所需的列(名称和年龄),并使用辅助id值将重新排序的数据框与数据框的其余部分连接起来。这里的主要问题是使用收集(),这对于大数据框来说可能是危险的,以及重新分区(1),这几乎违背了分布式计算和火花(它被用来避免在使用不同数量的分区压缩rdds时出现异常)。

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val splitValuesSchema = dfWithID.select(splitCols: _*).schema

// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
  .map(row => spark.sparkContext.makeRDD(List(row)))
  .map(rdd => spark.createDataFrame(rdd, splitValuesSchema))

val rotateIDCols = Array(auxCol) ++ colsToRotate

// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
  .map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))

// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
  .map(df => df.select(auxCol).orderBy(rand()).toDF())

// get rdds with random ids
val randIdRdds = randIdDFs
  .map(df => df.select(auxCol).rdd.map(row => row(0)))

// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
  .map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
  .map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))

val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
  .createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
  .reduce(_ union _).withColumnRenamed("rotated_id", "column2join")

// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order

显示输出数据帧会产生与上述预期输出类似的结果(它基本上取决于rand()函数的顺序)

我希望尽可能避免使用收集和重新分区,并获得更实用的解决方案。

欢迎任何评论或想法!

共有1个答案

赵英哲
2023-03-14

我一直试图通过尽可能删除性能不佳的调用(重新分区和一些对方付费)来找到一个更好、更清晰、更实用的解决方案。我添加了一个辅助方法来索引数据帧行,以便能够连接不相关的部分(不能由任何公共列连接的列或dfs)。这是我目前的开发,它也去除了rdd和数据帧之间的多重转换,看起来更可读和更容易理解。

我希望这可以帮助有同样担忧的人。

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
  df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
  StructType(df.schema.fields :+ StructField("index", LongType, false))
)

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)

// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()

// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
  .foldLeft(dfWithID) {
    (df, filterField) =>
      df.filter(col(filterField._1) === filterField._2)
        .select(rotateIDCols: _*)
  })

// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())

// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))

val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)

// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
  case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}

// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
  case (df1, df2) => df1.join(df2, Seq("index"))
    .drop("index").withColumnRenamed(auxCol, "column2join")
}

// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }

// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order
 类似资料:
  • 我有列。 如何根据值将其拆分为2? 第一个将包含

  • 我有一个像下面这样的DataFrame,标识符作为现有DateIndex顶部的列。 我的目标是为除id之外的每一列(a和B)创建一个新的子DataFrames,其中dateIndex作为单个索引,id(foo,bar)作为列名。预期产出如下所示:

  • 我有两个熊猫数据框 步骤2:对于flag=1的行,AA_new将计算为var1(来自df2)*组“A”和val“AA”的df1的'cal1'值*组“A”和val“AA”的df1的'cal2'值,类似地,AB_new将计算为var1(来自df2)*组“A”和val“AB”的df1的'cal1'值*组“A”和val“AB”的df1的'cal2'值 我的预期输出如下所示: 以下基于其他stackflow

  • 第一次问问题(温柔点),因为我还没有找到任何有用的东西。 在R中,我有两个数据帧。一个(DataFrameA)有一列带有唯一日期列表。另一个(DataFrameB)也有日期列表。但是DataFrameB中的某些日期在DataFrameA中可能不存在。在这种情况下,我想将DataFrameB中的日期更新为DataFrameA中的最小日期,该日期大于DataFrameB中的日期。 在SQL中,我可能会

  • 我有一个数据帧如下所示: 如何根据性别的np值转换dataframe? 我想要原始数据帧df被拆分为df1(姓名,年龄,性别,高度,日期),它将具有性别的值(df的前3行)

  • 我使用文件路径解析 Spark 数据帧,但现在我想将路径与时间一起作为单独的列添加到生成的数据帧中。下面是一个当前的解决方案(pathToDF 是一个帮助器方法): 我正在尝试做这样的事情,但我不确定如何使用Column添加时间列: 实现它的更好方法是什么? 输入自由度: 当前结果: 预期结果: