当前位置: 首页 > 工具软件 > Hazelcast > 使用案例 >

Hazelcast Jet 聚合(Aggregate)

苏涵润
2023-12-01

前言

Jet内部使用了2阶段聚合。第一阶段为accumulate,第二阶段为combine。
为什么使用2阶段的聚合方式呢?
因为单阶段的聚合方式仅仅适用于batch,在流式聚合中,单阶段的方式违背了数学定理CA(commutative associative)。

一、Jet的集群优化

聚合场景会消耗大量的集群算力,为了优化集群的负载,Jet内部会对聚合算子自动优化。

  1. 如果代码是下面这么写
stage.rebalance().groupingKey(keyFn).aggregate(...)

jet会修改两阶段聚合算子,移除本地聚合使用分片转换成分布式聚合。
2. 如果代码是下面这么写

stage.rebalance().aggregate(...)

jet会直接在两阶段前面增加一个rebalance节点。

二、Jet默认的四个聚合算子

Jet内置了一些默认的聚合处理器,提供了以下四个默认接口
例如AggregateOperation1聚合算子:

Interface AggregateOperation1<T,A,R>
T - the type of the stream item 比如计算Integer累加,这个T就是Integer
A - the type of the accumulator 比如自定了类DoubleAccumulator,那么这个A就是DoubleAccumulator
R - the type of the aggregation result 比如结果是Double,这这R就是Double

另外三个聚合算子:

Interface AggregateOperation<A,R>
Interface AggregateOperation2<T0,T1,A,R>
Interface AggregateOperation3<T0,T1,T2,A,R>

自定义的聚合算子可以用此类构建:AggregateOperationBuilder,上面四个默认算子内部也是用这个类来辅助构建的。
这个AggregateOperationBuilder类有几个内部类,比如Arity1,定义如下:

    public static class Arity1<T0, A, R> {
        private final SupplierEx<A> createFn;
        private final BiConsumerEx<? super A, ? super T0> accumulateFn0;
        private BiConsumerEx<? super A, ? super A> combineFn;
        private BiConsumerEx<? super A, ? super A> deductFn;
        private FunctionEx<? super A, ? extends R> exportFn;

三、自定义聚合写法

主要关注数据库CDC领域的流式聚合写法,请看下面的说明:
我们可以利用AggregateOperation接口的链式能力,构建自定义算子。
第一步,用withCreate往链中添加目标类
比如:我自定义了一个数据库CDC的聚合类InsertAccumulator,就可以按下面的格式把它添加到链中:

AggregateOperation.withCreate(InsertAccumulator::new)

第二步,用andAccumulate往链中添加两阶段聚合中第一阶段的Fn

public <T> AggregateOperationBuilder.Arity1<T, A, Void> andAccumulate(BiConsumerEx<? super A, ? super T> accumulateFn)
accumulateFn:待添加的accumulate算子
返回值仍然是Arity1,方便链式调用

它还有一个变体

public <T0> AggregateOperationBuilder.Arity1<T0, A, Void> andAccumulate0(BiConsumerEx<? super A, ? super T0> accumulateFn0)

第三步,用andCombine往链中添加两阶段聚合中第二阶段的Fn(加和)

public AggregateOperationBuilder.Arity1<T0, A, R> andCombine(BiConsumerEx<? super A, ? super A> combineFn)
combineFn:待添加的combine算子
返回值仍然是Arity1,方便链式调用

第四步,用andDeduct往链中添加两阶段聚合中第二阶段的Fn(减)

public AggregateOperationBuilder.Arity1<T0, A, R> andDeduct(BiConsumerEx<? super A, ? super A> deductFn) 
deductFn:待添加的减算子
返回值仍然是Arity1,方便链式调用

第五步,用andExportFinish结束链式调用
注意,这个andExportFinish是一个复合函数,它是为了流式计算而生,它会在每轮结束前emit一个结果。这样在流式数据中,相当于结果数据也是流式的。

public <R_NEW> AggregateOperation1<T0, A, R_NEW> andExportFinish(FunctionEx<? super A, ? extends R_NEW> exportFinishFn)
exportFinishFn:结束&弹出结果

这个方法会新建实现类AggregateOperation1Impl

四、AggregateOperation1Impl内部实现解析

AggregateOperation1Impl的类定义如下:

public class AggregateOperation1Impl<T0, A, R>
        extends AggregateOperationImpl<A, R>
        implements AggregateOperation1<T0, A, R> {
    public AggregateOperation1Impl(
            @Nonnull SupplierEx<A> createFn,
            @Nonnull BiConsumerEx<? super A, ? super T0> accumulateFn,
            @Nullable BiConsumerEx<? super A, ? super A> combineFn,
            @Nullable BiConsumerEx<? super A, ? super A> deductFn,
            @Nonnull FunctionEx<? super A, ? extends R> exportFn,
            @Nonnull FunctionEx<? super A, ? extends R> finishFn
    )
    ...

显而易见,这个类只是Jet内部对聚合算子的一种实现而已。那么推而广之,如果在数据库CDC的场景这些方法不够用的话,也可以自定义接口、实现类。

五、自定义接口和实现类

 类似资料: