当前位置: 首页 > 工具软件 > AGG > 使用案例 >

pyspark的聚合函数agg使用

唐运诚
2023-12-01

pyspark中聚合函数agg的使用

  作为聚合函数agg,通常是和分组函数groupby一起使用,表示对分组后的数据进行聚合操作;
如果没有分组函数,默认是对整个dataframe进行聚合操作。

下面从两方面讲agg。第一就是聚合操作的写法,第二是常用的聚合函数

关于如何创建dataframe,请参考之前写的教程(pyspark下dataframe的8种创建方式),

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('increase delete change select').master('local').getOrCreate()
df = spark.createDataFrame([
    ['alex',1,2,'string1'],
    ['paul',11 ,12,'string2'],
    ['alex',21,22,'leon'],
    ['james',31,32,'traveler']],schema=('name string,a long, b long, c string'))
df.show()
+-----+---+---+--------+
| name|  a|  b|       c|
+-----+---+---+--------+
| alex|  1|  2| string1|
| paul| 11| 12| string2|
| alex| 21| 22|    leon|
|james| 31| 32|traveler|
+-----+---+---+--------+

聚合操作的写法

第一种写法:groupby后直接跟一个聚合函数

  这种写法中,如果聚合函数内没有指定列名,则表示对所有列都采取聚合,如果指定列名,则只对具体指定的那些列进行聚合。

  显然这有一个特点,聚合类型只能有一种,比如下面的例子,聚合只能有一个avg,不能混合

df.groupBy('name').avg('a','b').show()
+-----+------+------+
| name|avg(a)|avg(b)|
+-----+------+------+
| paul|  11.0|  12.0|
| alex|  11.0|  12.0|
|james|  31.0|  32.0|
+-----+------+------+

  可以看出,聚合加工后的列名改变了,变成了什么avg(a)等,显然大多数情况下,我们希望列名不变活着是新起一个列名,这个时候,可以参考我前面教程,在文章末尾有讲到重命名(pyspark数据处理之----全量查询select和条件查询filter

第二种写法:agg里面跟一些聚合函数
  这种写法中,可以有多种类型的聚合函数,对不同的列可以指定不同类型的聚合函数
通常对于这种写法中又有两种写法:

  • 第一种是字典形式:key是列名,value是聚合函数类型;
  • 第二种是聚合函数内写入列名;
df.groupBy('name').agg({'a':'avg','b':'max'}).show()
+-----+------+------+
| name|avg(a)|max(b)|
+-----+------+------+
| paul|  11.0|    12|
| alex|  11.0|    22|
|james|  31.0|    32|
+-----+------+------+
df.groupBy('name').agg(F.avg('a'),F.max('b')).show()
+-----+------+------+
| name|avg(a)|max(b)|
+-----+------+------+
| paul|  11.0|    12|
| alex|  11.0|    22|
|james|  31.0|    32|
+-----+------+------+

  针对以上的写法做一点解释:为什么字典形式的时候只用写max,avg,而下面第二种写法中却要写F.max,F.avg
我认为这是因为这俩是两套函数,第一种写法用的是dataframe的内部函数,但是第二种写法用的是functions提供的函数
虽然二者最终的效果是一样的,但是函数的两套的。

聚合函数举例

dataframe内部聚合函数

  这部分聚合函数可直接在groupby后,及:df.groupBy(‘name’).avg();
也可以在agg后采用字典形式:df.groupBy(‘name’).agg({‘a’:‘avg’})

聚合函数有:
avg() :求均值
count():计数
max() :求最大值
mean() :求均值
min() :求最小值
sum() :求和

df.groupBy('name').agg({'a':'avg','b':'count'}).show()
+-----+------+--------+
| name|avg(a)|count(b)|
+-----+------+--------+
| paul|  11.0|       1|
| alex|  11.0|       2|
|james|  31.0|       1|
+-----+------+--------+

functions内部的聚合函数

这部分的聚合函数需要进行导入:from pyspark.sql import functions as F

这部分是函数必须是用在agg里面的,如:df.groupBy(‘name’).agg(F.avg(‘a’),F.max(‘b’)).show()

聚合函数如下:

  • collect_list :将分组后的同一列聚合为list
  • collect_set :将分组后的同一列聚合为set.
  • approx_count_distinct :统计某一列中不同的个数
  • avg :求均值
  • mean :求均值
  • first :求第一个值
  • last :求最后一个值
  • count :求分组有的个数
  • max :求最大值
  • min :求最小值
  • stddev_samp :求均方差
  • sum :求和
  • var_pop :求方差
df.agg(F.approx_count_distinct(df.a)).show()
+------------------------+
|approx_count_distinct(a)|
+------------------------+
|                       4|
+------------------------+
df.groupBy('name').agg(F.avg('a')).show()
+-----+------+
| name|avg(a)|
+-----+------+
| paul|  11.0|
| alex|  11.0|
|james|  31.0|
+-----+------+
df.groupBy('name').agg(F.count('a')).show()
+-----+--------+
| name|count(a)|
+-----+--------+
| paul|       1|
| alex|       2|
|james|       1|
+-----+--------+
df.groupBy('name').agg(F.kurtosis('a')).show()
+-----+-----------+
| name|kurtosis(a)|
+-----+-----------+
| paul|       null|
| alex|       -2.0|
|james|       null|
+-----+-----------+
df.groupBy('name').agg(F.stddev_samp('a')).show()
+-----+------------------+
| name|    stddev_samp(a)|
+-----+------------------+
| paul|              null|
| alex|14.142135623730951|
|james|              null|
+-----+------------------+
df.groupBy('name').agg(F.sum('a')).show()
+-----+------+
| name|sum(a)|
+-----+------+
| paul|    11|
| alex|    22|
|james|    31|
+-----+------+
df.groupBy('name').agg(F.var_pop('a')).show()
+-----+----------+
| name|var_pop(a)|
+-----+----------+
| paul|       0.0|
| alex|     100.0|
|james|       0.0|
+-----+----------+

 类似资料: