作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作;
如果没有分组函数,默认是对整个dataframe进行聚合操作。
下面从两方面讲agg。第一就是聚合操作的写法,第二是常用的聚合函数
关于如何创建dataframe,请参考之前写的教程(pyspark下dataframe的8种创建方式),
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('increase delete change select').master('local').getOrCreate()
df = spark.createDataFrame([
['alex',1,2,'string1'],
['paul',11 ,12,'string2'],
['alex',21,22,'leon'],
['james',31,32,'traveler']],schema=('name string,a long, b long, c string'))
df.show()
+-----+---+---+--------+
| name| a| b| c|
+-----+---+---+--------+
| alex| 1| 2| string1|
| paul| 11| 12| string2|
| alex| 21| 22| leon|
|james| 31| 32|traveler|
+-----+---+---+--------+
第一种写法:groupby后直接跟一个聚合函数
这种写法中,如果聚合函数内没有指定列名,则表示对所有列都采取聚合,如果指定列名,则只对具体指定的那些列进行聚合。
显然这有一个特点,聚合类型只能有一种,比如下面的例子,聚合只能有一个avg,不能混合
df.groupBy('name').avg('a','b').show()
+-----+------+------+
| name|avg(a)|avg(b)|
+-----+------+------+
| paul| 11.0| 12.0|
| alex| 11.0| 12.0|
|james| 31.0| 32.0|
+-----+------+------+
可以看出,聚合加工后的列名改变了,变成了什么avg(a)等,显然大多数情况下,我们希望列名不变活着是新起一个列名,这个时候,可以参考我前面教程,在文章末尾有讲到重命名(pyspark数据处理之----全量查询select和条件查询filter)
第二种写法:agg里面跟一些聚合函数
这种写法中,可以有多种类型的聚合函数,对不同的列可以指定不同类型的聚合函数
通常对于这种写法中又有两种写法:
df.groupBy('name').agg({'a':'avg','b':'max'}).show()
+-----+------+------+
| name|avg(a)|max(b)|
+-----+------+------+
| paul| 11.0| 12|
| alex| 11.0| 22|
|james| 31.0| 32|
+-----+------+------+
df.groupBy('name').agg(F.avg('a'),F.max('b')).show()
+-----+------+------+
| name|avg(a)|max(b)|
+-----+------+------+
| paul| 11.0| 12|
| alex| 11.0| 22|
|james| 31.0| 32|
+-----+------+------+
针对以上的写法做一点解释:为什么字典形式的时候只用写max,avg,而下面第二种写法中却要写F.max,F.avg
我认为这是因为这俩是两套函数,第一种写法用的是dataframe的内部函数,但是第二种写法用的是functions提供的函数
虽然二者最终的效果是一样的,但是函数的两套的。
这部分聚合函数可直接在groupby后,及:df.groupBy(‘name’).avg();
也可以在agg后采用字典形式:df.groupBy(‘name’).agg({‘a’:‘avg’})
聚合函数有:
avg() :求均值
count():计数
max() :求最大值
mean() :求均值
min() :求最小值
sum() :求和
df.groupBy('name').agg({'a':'avg','b':'count'}).show()
+-----+------+--------+
| name|avg(a)|count(b)|
+-----+------+--------+
| paul| 11.0| 1|
| alex| 11.0| 2|
|james| 31.0| 1|
+-----+------+--------+
这部分的聚合函数需要进行导入:from pyspark.sql import functions as F
这部分是函数必须是用在agg里面的,如:df.groupBy(‘name’).agg(F.avg(‘a’),F.max(‘b’)).show()
聚合函数如下:
df.agg(F.approx_count_distinct(df.a)).show()
+------------------------+
|approx_count_distinct(a)|
+------------------------+
| 4|
+------------------------+
df.groupBy('name').agg(F.avg('a')).show()
+-----+------+
| name|avg(a)|
+-----+------+
| paul| 11.0|
| alex| 11.0|
|james| 31.0|
+-----+------+
df.groupBy('name').agg(F.count('a')).show()
+-----+--------+
| name|count(a)|
+-----+--------+
| paul| 1|
| alex| 2|
|james| 1|
+-----+--------+
df.groupBy('name').agg(F.kurtosis('a')).show()
+-----+-----------+
| name|kurtosis(a)|
+-----+-----------+
| paul| null|
| alex| -2.0|
|james| null|
+-----+-----------+
df.groupBy('name').agg(F.stddev_samp('a')).show()
+-----+------------------+
| name| stddev_samp(a)|
+-----+------------------+
| paul| null|
| alex|14.142135623730951|
|james| null|
+-----+------------------+
df.groupBy('name').agg(F.sum('a')).show()
+-----+------+
| name|sum(a)|
+-----+------+
| paul| 11|
| alex| 22|
|james| 31|
+-----+------+
df.groupBy('name').agg(F.var_pop('a')).show()
+-----+----------+
| name|var_pop(a)|
+-----+----------+
| paul| 0.0|
| alex| 100.0|
|james| 0.0|
+-----+----------+