Jet内部使用了2阶段聚合。第一阶段为accumulate,第二阶段为combine。
为什么使用2阶段的聚合方式呢?
因为单阶段的聚合方式仅仅适用于batch,在流式聚合中,单阶段的方式违背了数学定理CA(commutative associative)。
聚合场景会消耗大量的集群算力,为了优化集群的负载,Jet内部会对聚合算子自动优化。
stage.rebalance().groupingKey(keyFn).aggregate(...)
jet会修改两阶段聚合算子,移除本地聚合使用分片转换成分布式聚合。
2. 如果代码是下面这么写
stage.rebalance().aggregate(...)
jet会直接在两阶段前面增加一个rebalance节点。
Jet内置了一些默认的聚合处理器,提供了以下四个默认接口
例如AggregateOperation1聚合算子:
Interface AggregateOperation1<T,A,R>
T - the type of the stream item 比如计算Integer累加,这个T就是Integer
A - the type of the accumulator 比如自定了类DoubleAccumulator,那么这个A就是DoubleAccumulator
R - the type of the aggregation result 比如结果是Double,这这R就是Double
另外三个聚合算子:
Interface AggregateOperation<A,R>
Interface AggregateOperation2<T0,T1,A,R>
Interface AggregateOperation3<T0,T1,T2,A,R>
自定义的聚合算子可以用此类构建:AggregateOperationBuilder,上面四个默认算子内部也是用这个类来辅助构建的。
这个AggregateOperationBuilder类有几个内部类,比如Arity1,定义如下:
public static class Arity1<T0, A, R> {
private final SupplierEx<A> createFn;
private final BiConsumerEx<? super A, ? super T0> accumulateFn0;
private BiConsumerEx<? super A, ? super A> combineFn;
private BiConsumerEx<? super A, ? super A> deductFn;
private FunctionEx<? super A, ? extends R> exportFn;
主要关注数据库CDC领域的流式聚合写法,请看下面的说明:
我们可以利用AggregateOperation接口的链式能力,构建自定义算子。
第一步,用withCreate往链中添加目标类
比如:我自定义了一个数据库CDC的聚合类InsertAccumulator,就可以按下面的格式把它添加到链中:
AggregateOperation.withCreate(InsertAccumulator::new)
第二步,用andAccumulate往链中添加两阶段聚合中第一阶段的Fn
public <T> AggregateOperationBuilder.Arity1<T, A, Void> andAccumulate(BiConsumerEx<? super A, ? super T> accumulateFn)
accumulateFn:待添加的accumulate算子
返回值仍然是Arity1,方便链式调用
它还有一个变体
public <T0> AggregateOperationBuilder.Arity1<T0, A, Void> andAccumulate0(BiConsumerEx<? super A, ? super T0> accumulateFn0)
第三步,用andCombine往链中添加两阶段聚合中第二阶段的Fn(加和)
public AggregateOperationBuilder.Arity1<T0, A, R> andCombine(BiConsumerEx<? super A, ? super A> combineFn)
combineFn:待添加的combine算子
返回值仍然是Arity1,方便链式调用
第四步,用andDeduct往链中添加两阶段聚合中第二阶段的Fn(减)
public AggregateOperationBuilder.Arity1<T0, A, R> andDeduct(BiConsumerEx<? super A, ? super A> deductFn)
deductFn:待添加的减算子
返回值仍然是Arity1,方便链式调用
第五步,用andExportFinish结束链式调用
注意,这个andExportFinish是一个复合函数,它是为了流式计算而生,它会在每轮结束前emit一个结果。这样在流式数据中,相当于结果数据也是流式的。
public <R_NEW> AggregateOperation1<T0, A, R_NEW> andExportFinish(FunctionEx<? super A, ? extends R_NEW> exportFinishFn)
exportFinishFn:结束&弹出结果
这个方法会新建实现类AggregateOperation1Impl
AggregateOperation1Impl的类定义如下:
public class AggregateOperation1Impl<T0, A, R>
extends AggregateOperationImpl<A, R>
implements AggregateOperation1<T0, A, R> {
public AggregateOperation1Impl(
@Nonnull SupplierEx<A> createFn,
@Nonnull BiConsumerEx<? super A, ? super T0> accumulateFn,
@Nullable BiConsumerEx<? super A, ? super A> combineFn,
@Nullable BiConsumerEx<? super A, ? super A> deductFn,
@Nonnull FunctionEx<? super A, ? extends R> exportFn,
@Nonnull FunctionEx<? super A, ? extends R> finishFn
)
...
显而易见,这个类只是Jet内部对聚合算子的一种实现而已。那么推而广之,如果在数据库CDC的场景这些方法不够用的话,也可以自定义接口、实现类。