当前位置: 首页 > 工具软件 > AGG > 使用案例 >

一文读懂pg AGG聚集算子计划与执行(更新中)

郑宏朗
2023-12-01

Agg聚集算子与聚集操作(函数)基本概念

聚集算子是用来执行聚集操作(函数)的一类算子,当含有聚集函数或者需要使用到聚集操作某些性质的时候(例如 except),都会生成含有聚集算子的计划。
聚集操作(函数),可以理解为由多条数据计算的出一个结果的操作,例如我们常用的min、max、count、avg等。
在这里我们注意区分聚集算子与聚集操作(函数)之间的差异,聚集算子是用来执行聚集操作的,并且一个聚集算子之中可以执行多个聚集操作(函数),例如如下语句中只有一个聚集算子,但是包含了三个聚集操作。

postgres=# explain verbose select min(a1), max(a1), avg(a1) from tba;
                             QUERY PLAN
--------------------------------------------------------------------
 Aggregate  (cost=44.04..44.05 rows=1 width=44)
   Output: min(a1), max(a1), avg(a1)
   ->  Seq Scan on public.tba  (cost=0.00..29.45 rows=1945 width=4)
         Output: a1, a2, a3
(4 rows)

执行策略

在聚集算子之中,我们可以将输入的元组全部直接进行各种聚集操作(plain-agg),也可以将输入的元组按照某些条件进行group by分组后对每一组数据各自进行聚集操作(group-agg 或 hash-agg),还可以根据计算结果进行having过滤从而舍弃掉某些分组,还可以支持更加复杂的grouping set分组操作。
由此,我们引出了AGG算子执行的三种策略,PLAIN-AGG、GROUP-AGG和HASH-AGG,三种策略适合不同的场景。plain-agg是用来执行不含分组(group by)操作的聚集策略,group-agg与hash-agg是用来执行具有分组操作的聚集策略,他们两个的差异在于group-agg的输入元组必须是有序的,而hash-agg并不关注输入元组的顺序。

聚集操作基本执行步骤与元数据

聚集操作的执行,需要三个步骤:

  1. 扫描数据并计算中间值(transition-value)
  2. 收集并综合中间值(collection-value)
  3. 计算最终结果(final-value)

其中第二步仅在多线程并行执行下需要,用于将多个线程的中间值收集并综合成一个中间值,单线程执行下并不需要此步骤。

这三个步骤以及其需要维护的中间值类型结构等元数据,都记录在pg_aggregate系统表之中,以avg()为例:

postgres=# select * from pg_aggregate where aggfnoid = 2101;  -- oid of avg
    aggfnoid    |   aggtransfn   |   aggcollectfn   | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect | aggkind | aggnumdirectargs
----------------+----------------+------------------+------------+-----------+--------------+------------+----------------+---------+------------------
 pg_catalog.avg | int4_avg_accum | int8_avg_collect | int8_avg   |         0 |         1016 | {0,0}      | {0,0}          | n       |                0
(1 row)

aggtranstype = 1016,表示中间值是一个int8的数组,其初始值agginitval为 {0,0},这两个0将分别表示sum与count。
其三个基本执行步骤所用到的函数分别存放在aggtransfn、aggcollectfn、aggfinalfn列中。

  • aggtransfn 负责将入参累加到中间值sum上,并将count加一。
  • aggcollectfn 负责将在并行执行场景下,将多个线程的中间值{sum, count}收集起来,并计算成一个最终的中间值,单线程执行并不涉及。
  • aggfinalfn 负责将最终的中间值计算为结果result,即result = sum / count。

当然,我们发现了并非所有的聚集操作都需要完整的三个步骤,例如count(),收集到的中间值便是最终结果。但如果统筹划分,所有的聚集操作都可以分为这三个步骤,只不过有些步骤不做动作罢了,这并不会产生什么影响。

计划生成与数据结构

确定操作策略

在**grouping_planner()中,会根据Query-tree是否含有grouping set、group by以及估算的行数大小、是否可以hash、是否并行执行等属性,确定其执行策略。
make_agg()**函数中创建agg计划节点。

关键数据结构

Aggref

Pergroup

Peragg

思考

这类计划是怎么生成的。
聚集函数有很多,每种的函数的算法各不相同,如何在一个算子内完成计算多个的聚集函数?
Group by操作又是如何执行的,如何在一个计划节点中,完成既分组,又对每个分组执行多个不尽相同的聚集函数?
having和where的区别?如何通过having来剔除分组?
并行agg的执行策略,是如何多个线程完成一个算子的?
如何将agg用作窗口函数?
Grouping set的计划与执行的?
入参含有order、filter的怎么计算?

基本执行流程

我们知道AGG算子的执行有三种执行策略,PLAIN-AGG、GROUP-AGG和HASH-AGG,三种策略适合不同的场景,下面我们按照从简单到复杂的顺序开始探索

PLAIN-AGG

我们从最简单的PLAIN-AGG入手,它直接计算不含分组操作(group by)的计算策略,以如下sql为例:

postgres=# create table tba(a1 int, a2 int, a3 int);
CREATE TABLE
postgres=# explain (verbose) select avg(a1) from tba;
                             QUERY PLAN
---------------------------------------------------------------------
 Aggregate  (cost=48.91..48.91 rows=1 width=76)
   Output: avg(a1)
   ->  Seq Scan on public.tba  (cost=0.00..29.45 rows=1945 width=12)
         Output: a1, a2, a3
(4 rows)

通过计划我们得知,AGG算子接收Scan算子扫描的结果,并计算聚集函数,其计算流程大致如下:

  1. 在下层节点获取一行元组 tba(a1 int, a2 int, a3 int)。
  2. 若是第一次调用,则初始化aggtransfn的上下文,若不是则进入步骤3。主要是初始化中间值transition-value。会根据系统表的aggtranstype、agginitval元数据,初始化transition-value为一个int8的数组 {0,0},其分别表示sum、count;
  3. 得到入参a1,调用aggtransfn函数(int4_avg_accum)计算维护中间值transition-value,count++, sum += param。返回步骤1。
  4. 若元组输入结束,则调用aggfinalfn函数(int8_avg_collect),将中间值计算为最终结果。

当然,并非所有的agg函数都需要这两个阶段,例如count、min、max,仅需要一个transfn即可,因为它的transition-value就是final-value。

若是对于多个agg函数,例如select avg(a1),max(a1)… from tba; 我们只需要将2、3、4步骤放入一个for循环即可。举个例子,通俗易懂的解释,我们知道,对于单个Agg函数,都需要经过上面的1-4的步骤,那么当查询中包含多个Agg函数中,我们需要对每个函数都完整的各自独立的执行那四个步骤吗?显而易见,不是的,步骤一明显只需要执行一次,因此我们可以按照上面的思路写出来如下的伪代码1:

while ((tuple = getnexttuple()) != NULL) { foreach (peragg, allaggs) { if (peragg is notinit) { peragg.init(); break; } else { peragg.transval = peragg.transfunc(tuple, peragg.transval); } } } foreach(peragg, allaggs) { peragg.finalval = peragg.finalfunc(peragg.transval); } return make_result_tuple_project();

那么假如说我的查询里面一个agg函数包含多次,或者有表达式怎么办?例如:select max(a) + min(b), max(a) - min(b) …
我们又是怎么计算的呢?
此时我们的再计划生成的时候allaggs里面已经只有两个agg,max(a)与min(b),我们按照正常的四步计算流程算出相应的值,最后再返回结果时计算出相应的表达式结果投影即可。

最后我们继续深入思考三个问题:
1.对于select max(a), max(b)… 这种参数不同但是函数一致,allaggs里面会有几个呢?
2.对于select max(a1 + a2) 这种又会怎样计算的呢?可以在上面的伪代码上哪里添加一些东西可以实现?
3.为什么上面伪代码的init要在获取第一条数据之后呢?
有兴趣的可以研究一下。

GROUP-AGG

group-agg,可以用来计算含有分组操作的聚集计算,但是它要求其每个元组的输入必须是有序的,因此也称为sort-agg。
以如下sql为例




可以看到,再groupagg下面存在一个sort算子。那么很好理解,对于一个有序的数组进行分组,算法就十分的简单了,我们只需要保存一个值作为分组标识就可以了,当下一个值与保存的分组标识不同,就代表着上一个分组结束,新的分组开始了。
因此思路也就变得很简单了起来。
既然是分组操作,我们需要有一个结构记录当前分组的信息,记作pergroup,主要用来记录当前组的分组标识,计算流程中当前组的transvalue等。

1.下层算子传来元组 tba(a1, a2, a3)。
2.若没有初始化,则初始化分组标识pergroup,初始化transvalue等。
3.判断

HASH-AGG

hash-agg也可以用来计算含有分组操作的聚集函数,它和group-agg的差异在于它不依赖于下层元组的有序输入。
以如下sql为例

可以看到相对于group-agg,其不依赖下层元祖的顺序输入。

并行执行流程

啥是并行执行?简单的说就是一条sql由多个线程共同完成,每个线程完成其中的一部分结果,这样可以极大的提升执行效率,当然所占用的物理资源也会相应增大。
并行执行的一个基础条件是并行扫描,即多个线程同时scan一个表,每个线程只扫描各自的部分,它们之间互不相同,加起来又是一整个表。这部分的原理很好实现,方案也很多,例如可以维护一个全局指针作为游标,这个游标以page为单位,每个线程获取游标所指的page并将游标移动到下一个page。

那么agg函数又是一个什么样的并行执行策略呢?
在上文中我们已经知道agg函数的普通执行,可以分为两个阶段,trans、final。其中trans收集元组并计算中间结果,元组收集完毕则将中间结果计算出最终结果。
以avg为例:
在avg的执行中存在两个中间变量,sum、count。
trans收集元组,并维护中间变量(sum += tuple.value, count++)。
final依据中间变量计算最终结果(res = sum / count)。

思考这两个阶段,我们想,如果可以由多个中间变量计算出最终中间变量(sum = sum1 + sum2 + sum3…… count = count1 + count2 + count3),是不是可以实现并行了呢?
例如,多线程的情况下,每个线程计算全集的一部分中间结果,然后将中间结果汇总给主线程,主线程将收集到的中间结果计算出最终中间结果,并计算出最终结果,是不是就并行了呢?
举一个简单的例子:
统计全国人的平均身高。
非并行执行的方案是一个统计员,这个统计员跨边祖国大好河山,统计了每个人的身高,最终计算出了结果。
并行的执行方案是一个每个省都有一个统计员,他们各自统计每个省的总身高与总人数,并将结果报告给国家统计员,这个国家统计员根据每个省的中间结果是

grouping set\cube的执行策略

grouping set为group by语法的扩展,是对同一种查询但分组不同的sql的方便写法,例如

select max(a1) from tba group by grouping set(a2, a3);

其等价于

select max(a1) from tba group by a2
union all
select max(a1) from tba group by a3;

详细可以推荐参考大佬的讲解:https://help.aliyun.com/document_detail/92807.html

思考,我们如何执行才最有效率呢?是把其按照等价的union all去执行么?那么产生的计划为:

postgres=# explain select max(a1) from tba group by a2 union all select max(a1) from tba group by a3;
                                   QUERY PLAN
---------------------------------------------------------------------------------
 Result  (cost=39.17..86.35 rows=400 width=4)
   ->  Append  (cost=39.17..86.35 rows=400 width=4)
         ->  Subquery Scan on "*SELECT* 1"  (cost=39.17..43.17 rows=200 width=4)
               ->  HashAggregate  (cost=39.17..41.17 rows=200 width=12)
                     Group By Key: public.tba.a2
                     ->  Seq Scan on tba  (cost=0.00..29.45 rows=1945 width=8)
         ->  Subquery Scan on "*SELECT* 2"  (cost=39.17..43.17 rows=200 width=4)
               ->  HashAggregate  (cost=39.17..41.17 rows=200 width=12)
                     Group By Key: public.tba.a3
                     ->  Seq Scan on tba  (cost=0.00..29.45 rows=1945 width=8)
(10 rows)

这样子的话,同样的对基表tba扫描会执行两边,而且除了分组不同之外,其它的操作都是一样的。很明显这有点蠢,而且不但蠢,执行效率也会变得很让人落泪,也不符合复用的原则,众多同样的工作为什么不可以抽象成一个呢。
继续思考,既然只是分组不同,那么对于我们上面讲过的聚集函数的执行框架,是不是可以修改一下,能不能在一次扫描里进行多次计算?例如吧上面的计划提取复用成

 Result  (rows=400 width=4)
    ->  Subquery Scan on "*SELECT* 1"  (rows=200 width=4)
          ->  HashAggregate  (rows=200 width=12)
                Group By Key: public.tba.a2
                Group By Key: public.tba.a3
                ->  Seq Scan on tba  (rows=1945 width=8)

那么现在就相当于在agg算子中,创建两个hash表,对于tba的每一条元组,执行两次分组计算,分别填入两个hash表。这样的话就完全省略了一次全表扫描的时间。
同样的假如我的grouping set之中有七八个键,那么这里就会省略六七次全表扫描时间,提升还是极为可观的。

按照原理,其实现也相当于是在agg算子里面加了一个for循环,遍历一个叫做groupchain的数组,实现不同的分组罢了。

代码走读

参考

[1]: 《postgres数据库内核分析》 – 彭智勇 彭煜玮

 类似资料: