我开始使用Spark DataFrames,我需要能够枢轴的数据,以创建多个列1列多行。在Scalding中有内置的功能,我相信Python中的熊猫,但是我找不到任何新的Spark Dataframe。
我假设我可以编写某种自定义函数来实现这一点,但我甚至不知道如何开始,特别是因为我是Spark的新手。如果有人知道如何使用内置功能或如何在Scala中编写东西的建议来实现这一点,我们将不胜感激。
pivot操作符已添加到Spark dataframe API,是Spark 1.6的一部分。
详见https://github.com/apache/spark/pull/7841。
我通过编写for循环来动态创建SQL查询来克服这个问题。说我有:
id tag value
1 US 50
1 UK 100
1 Can 125
2 US 75
2 UK 150
2 Can 175
我想:
id US UK Can
1 50 100 125
2 75 150 175
我可以创建一个包含我想要透视的值的列表,然后创建一个包含我需要的SQL查询的字符串。
val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1
var query = "select *, "
for (i <- 0 to numCountries-1) {
query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "
}
query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"
myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)
我可以创建类似的查询,然后进行聚合。不是一个非常优雅的解决方案,但它可以工作,并且对任何值列表都很灵活,当调用代码时,这些值也可以作为参数传入。
正如David Anderson提到的,Spark自1.6版以来提供了pivot
功能。一般语法如下所示:
df
.groupBy(grouping_columns)
.pivot(pivot_column, [values])
.agg(aggregate_expressions)
使用nycflight s13
和csv
格式的用法示例:
蟒蛇:
from pyspark.sql.functions import avg
flights = (sqlContext
.read
.format("csv")
.options(inferSchema="true", header="true")
.load("flights.csv")
.na.drop())
flights.registerTempTable("flights")
sqlContext.cacheTable("flights")
gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")
flights.count()
## 336776
%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop
斯卡拉:
val flights = sqlContext
.read
.format("csv")
.options(Map("inferSchema" -> "true", "header" -> "true"))
.load("flights.csv")
flights
.groupBy($"origin", $"dest", $"carrier")
.pivot("hour")
.agg(avg($"arr_delay"))
爪哇:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;
Dataset<Row> df = spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("flights.csv");
df.groupBy(col("origin"), col("dest"), col("carrier"))
.pivot("hour")
.agg(avg(col("arr_delay")));
R/SparkR:
library(magrittr)
flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)
flights %>%
groupBy("origin", "dest", "carrier") %>%
pivot("hour") %>%
agg(avg(column("arr_delay")))
R/年
library(dplyr)
flights <- spark_read_csv(sc, "flights", "flights.csv")
avg.arr.delay <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"avg",
"arr_delay"
)
gdf %>% invoke("agg", expr, list())
}
flights %>%
sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)
SQL:
请注意,SparkSQL中的PIVOT关键字从2.4版开始得到支持。
CREATE TEMPORARY VIEW flights
USING csv
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;
SELECT * FROM (
SELECT origin, dest, carrier, arr_delay, hour FROM flights
) PIVOT (
avg(arr_delay)
FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
);
示例数据:
"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00
性能注意事项:
一般来说,旋转是一项昂贵的操作。
>
如果可以的话,尝试提供值
列表,因为这样可以避免额外的命中来计算uniques:
vs = list(range(25))
%timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
## 10 loops, best of 3: 392 ms per loop
在某些情况下,重新分区和/或预聚合数据被证明是有益的(可能不再值得在2.0或更高版本中付出努力)
仅对于重塑,您可以在Pyspark Dataframe上使用第一个
:Pivot String列
相关问题:
什么是透视? 如何透视? 这是枢轴吗? 长格式到宽格式? 我见过很多关于透视表的问题。即使他们不知道他们询问的是透视表,他们通常也是。几乎不可能写出一个包含旋转的所有方面的规范的问题和答案。 ...但我要试一试。 现有问题和答案的问题是,问题通常集中在一个细微差别上,而OP很难将其概括出来,以便使用现有的许多好答案。然而,没有一个答案试图给出一个全面的解释(因为这是一个令人生畏的任务) 从我的谷歌
A 数据透视表介绍 B.1 什么是数据透视表? 数据透视表是一种可以快速汇总、分析大量数据表格的交互式工具。使用数据透视表可以按照数据表格的不同字段从多个角度进行透视,并建立交叉表格,用以查看数据表格不同层面的汇总信息、分析结果以及摘要数据。使用数据透视表可以深入分析数值数据,以帮助用户发现关键数据,并做出有关企业中关键数据的决策。 数据透视表是针对以下用途特别设计的:以友好的方式,查看大量的数据
数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水
数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水平对齐方式。 数据 字体 设置字段名
数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水