当前位置: 首页 > 知识库问答 >
问题:

如何透视Spark数据帧?

龙成仁
2023-03-14

我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。

我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。

共有3个答案

孔硕
2023-03-14

pivot操作符已添加到Spark dataframe API,是Spark 1.6的一部分。

详见https://github.com/apache/spark/pull/7841。

左丘恩
2023-03-14

我通过编写for循环来动态创建SQL查询来克服这个问题。说我有:

id  tag  value
1   US    50
1   UK    100
1   Can   125
2   US    75
2   UK    150
2   Can   175

我想:

id  US  UK   Can
1   50  100  125
2   75  150  175

我可以创建一个包含我想要透视的值的列表,然后创建一个包含我需要的SQL查询的字符串。

val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1

var query = "select *, "
for (i <- 0 to numCountries-1) {
  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "
}
query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)

我可以创建类似的查询,然后进行聚合。不是一个非常优雅的解决方案,但它可以工作,并且对任何值列表都很灵活,当调用代码时,这些值也可以作为参数传入。

韩寒
2023-03-14

正如David Anderson提到的,Spark自1.6版以来提供了pivot功能。一般语法如下所示:

df
  .groupBy(grouping_columns)
  .pivot(pivot_column, [values]) 
  .agg(aggregate_expressions)

使用nycflight s13csv格式的用法示例:

蟒蛇:

from pyspark.sql.functions import avg

flights = (sqlContext
    .read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load("flights.csv")
    .na.drop())

flights.registerTempTable("flights")
sqlContext.cacheTable("flights")

gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")

flights.count()
## 336776

%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop

斯卡拉:

val flights = sqlContext
  .read
  .format("csv")
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .load("flights.csv")

flights
  .groupBy($"origin", $"dest", $"carrier")
  .pivot("hour")
  .agg(avg($"arr_delay"))

爪哇:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;

Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("flights.csv");

df.groupBy(col("origin"), col("dest"), col("carrier"))
        .pivot("hour")
        .agg(avg(col("arr_delay")));

R/SparkR:

library(magrittr)

flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

flights %>% 
  groupBy("origin", "dest", "carrier") %>% 
  pivot("hour") %>% 
  agg(avg(column("arr_delay")))

R/年

library(dplyr)

flights <- spark_read_csv(sc, "flights", "flights.csv")

avg.arr.delay <- function(gdf) {
   expr <- invoke_static(
      sc,
      "org.apache.spark.sql.functions",
      "avg",
      "arr_delay"
    )
    gdf %>% invoke("agg", expr, list())
}

flights %>% 
  sdf_pivot(origin + dest + carrier ~  hour, fun.aggregate=avg.arr.delay)

SQL:

请注意,SparkSQL中的PIVOT关键字从2.4版开始得到支持。

CREATE TEMPORARY VIEW flights 
USING csv 
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

 SELECT * FROM (
   SELECT origin, dest, carrier, arr_delay, hour FROM flights
 ) PIVOT (
   avg(arr_delay)
   FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
                13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
 );

示例数据

"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00

性能注意事项:

一般来说,旋转是一项昂贵的操作。

>

  • 如果可以的话,尝试提供列表,因为这样可以避免额外的命中来计算uniques:

    vs = list(range(25))
    %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
    ## 10 loops, best of 3: 392 ms per loop
    

    在某些情况下,重新分区和/或预聚合数据被证明是有益的(可能不再值得在2.0或更高版本中付出努力)

    仅对于重塑,您可以在Pyspark Dataframe上使用第一个:Pivot String列

    相关问题:

    • 如何融化Spark数据帧
    • 在spark sql/pyspark中取消Pivot
    • 使用Spark将列换行

  •  类似资料:
    • 什么是透视? 如何透视? 这是枢轴吗? 长格式到宽格式? 我见过很多关于透视表的问题。即使他们不知道他们询问的是透视表,他们通常也是。几乎不可能写出一个包含旋转的所有方面的规范的问题和答案。 ...但我要试一试。 现有问题和答案的问题是,问题通常集中在一个细微差别上,而OP很难将其概括出来,以便使用现有的许多好答案。然而,没有一个答案试图给出一个全面的解释(因为这是一个令人生畏的任务) 从我的谷歌

    • A 数据透视表介绍 B.1 什么是数据透视表? 数据透视表是一种可以快速汇总、分析大量数据表格的交互式工具。使用数据透视表可以按照数据表格的不同字段从多个角度进行透视,并建立交叉表格,用以查看数据表格不同层面的汇总信息、分析结果以及摘要数据。使用数据透视表可以深入分析数值数据,以帮助用户发现关键数据,并做出有关企业中关键数据的决策。 数据透视表是针对以下用途特别设计的:以友好的方式,查看大量的数据

    • 数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水

    • 数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水平对齐方式。 数据 字体 设置字段名

    • 数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水