当前位置: 首页 > 面试题库 >

使用Spark将列转置为行

舒仲渊
2023-03-14
问题内容

我正在尝试将表的某些列转置为行。我正在使用Python和Spark 1.5.0。这是我的初始表:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

我想要这样的东西:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

有人知道我能做到吗?感谢您的帮助。


问题答案:

使用基本的Spark SQL函数相对简单。

蟒蛇

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))


 类似资料:
  • 问题内容: 我正在尝试将表的某些列转置为行。我正在使用Python和Spark 1.5.0。这是我的初始表: 我想要这样的东西: 有人知道我能做到吗?谢谢你的帮助。 问题答案: 使用基本的Spark SQL函数相对简单。 python Scala:

  • 我想使用spark dataframe将行转换为列。 我的桌子是这样的 我想把它转换成 我用了下面的代码:- 但我得到的结果是-- 任何人都可以帮助得到渴望的结果。

  • 问题内容: 我在具有两列mvv和count的数据帧上工作。 我想获得两个包含mvv值和计数值的列表。就像是 因此,我尝试了以下代码:第一行应返回一个python行列表。我想看第一个值: 但是我在第二行收到一条错误消息: AttributeError:getInt 问题答案: 瞧,为什么您这样做的方式行不通。首先,您尝试从行类型获取整数,collect的输出如下所示: 如果您采取这样的做法: 您将获

  • 但我得到第二行的错误消息: AttributeError:getInt

  • 问题内容: 我有下表,该表为每个用户提供了多个电子邮件地址。 我需要将其展平为用户查询中的列。请根据创建日期为我提供“最新”的3个电子邮件地址。 问题答案: 使用从模块。

  • 问题内容: 我正在尝试将行转置为列,但是没有找到任何好的答案。 这是我想要的示例: 输入表: 输出将是: 问题答案: 您需要枚举值以对其进行透视: