当前位置: 首页 > 工具软件 > Hazelcast Jet > 使用案例 >

Hazelcast Jet Processor

娄弘
2023-12-01

前言

com.hazelcast.jet.core.processor.Processors(简称P)
这个类实现了核心的P,这里的P对应的是Jet引擎内部的DAG图的节点。
简单来说,这些P处理的是聚合(SUM/AVG/MAX/MIN),更复杂的情况是,可能是在任意的Key上做聚合,甚至是基于事件发生的时间窗上的聚合。因此Jet开发了两类P节点:

  • 单阶段P
  • 两阶段P
    注意:从下面接口文档可以看出,无论是单阶段还是两阶段,都只适用于跑批的场景,并且重启job会丢失数据。
    Jet内部默认传递给P的函数都是无状态的。

Jet的Streaming和batch

Jet区分steaming和batch非常简单,仅仅在P的complete方法上,返回false就是steaming的,返回true就是batch的。

单阶段聚合

所有的入站数据最好是预先分好区,groupby好,这样才不会在Jet集群中产生大量跨节点的数据传输。这种方式最省内存。

             -----------------
            | upstream vertex |
             -----------------
                     |
                     | partitioned-distributed
                     V
                -----------
               | aggregate |
                -----------

两阶段聚合

第一个阶段只做accumulate,第二个阶段才做combine和finish动作。第一阶段完全在单节点本地,数据的分区,groupby只发生在第二阶段,因为第一阶段已经对数据做了聚合,到第二阶段数据传输量极小。

            -----------------
           | upstream vertex |
            -----------------
                    |
                    | partitioned-local
                    V
              ------------
             | accumulate |
              ------------
                    |
                    | partitioned-distributed
                    V
             ----------------
            | combine/finish |
             ----------------

在Jet中,单阶段聚合是两阶段聚合的特例

没有分区,groupby的情况下,第一阶段数据就在节点本地accumulate,然后第二阶段汇集到一个单一combine/finish节点上。

            -----------------
           | upstream vertex |
            -----------------
                    |
                    | local, non-partitioned
                    V
              ------------
             | accumulate |
              ------------
                    |
                    | distributed, all-to-one
                    V
             ----------------
            | combine/finish | localParallelism = 1
             ----------------

Processors.java方法详解

public static <A,R> SupplierEx<Processor> aggregateP(@Nonnull
                                                              AggregateOperation<A,R> aggrOp)
                                                              Type Parameters:
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the finished result returned from aggrOp.finishAccumulationFn()
Parameters:
aggrOp - the aggregate operation to perform

消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <A,R> SupplierEx<Processor> accumulateP(@Nonnull
                                                               AggregateOperation<A,R> aggrOp)

消费入站数据,返回结果A。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <A,R> SupplierEx<Processor> combineP(@Nonnull
                                                            AggregateOperation<A,R> aggrOp)

配合accumulateP使用,消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateByKeyP(List<FunctionEx<?,? extends K>> keyFns,
                                                                AggregateOperation<A,R> aggrOp,
                                                                BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)
Type Parameters:
K - type of key
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the result returned from aggrOp.finishAccumulationFn()
OUT - type of the item to emit
Parameters:
keyFns - functions that compute the grouping key
aggrOp - the aggregate operation
mapToOutputFn - function that takes the key and the aggregation result and returns the output item

可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A> SupplierEx<Processor> accumulateByKeyP(List<FunctionEx<?,? extends K>> getKeyFns,
                                                           AggregateOperation<A,?> aggrOp)

两阶段P的第一阶段P,根据分区,groupby应用accumulate,每个groupby的key输出一个Map.Entry<K, A>。
可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> combineByKeyP(AggregateOperation<A,R> aggrOp,
                                                              BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)

两阶段P的第二阶段P,从多路accumulateByKeyP接收数据并应用combine,应用函数mapToOutputFn,每个key算出一个R。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSlidingWindowP(
													List<FunctionEx<?,? extends K>> keyFns,
                                                    List<ToLongFunctionEx<?>> timestampFns,
                                                    TimestampKind timestampKind,
                                                    SlidingWindowPolicy winPolicy,
                                                    long earlyResultsPeriod,
                                                    AggregateOperation<A,? extends R> aggrOp,
                                                    KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
————————————————————————————————————

public static <K,A> SupplierEx<Processor> accumulateByFrameP(
											List<FunctionEx<?,? extends K>> keyFns,
                                            List<ToLongFunctionEx<?>> timestampFns,
                                            TimestampKind timestampKind,
                                            SlidingWindowPolicy winPolicy,
                                            AggregateOperation<A,?> aggrOp)

暂时用不到
—————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> combineToSlidingWindowP(SlidingWindowPolicy winPolicy,
                                                                        AggregateOperation<A,? extends R> aggrOp,
                                                                        KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
—————————————————————————————————————

public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSessionWindowP(
													long sessionTimeout,
                                                    long earlyResultsPeriod,
                                                    List<ToLongFunctionEx<?>> timestampFns,
                                                    List<FunctionEx<?,? extends K>> keyFns,
                                                    AggregateOperation<A,? extends R> aggrOp,
                                                    KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)

暂时用不到
————————————————————————————————————

public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull
                                                                   EventTimePolicy<? super T> eventTimePolicy)

流处理的API来了!!!
在流里面插入watermark,wartermark由eventTimePolicy给出。
这个P会丢弃迟到的(late)数据。
最晚emit水印的数据值会被写入快照。重启所有节点,会从快照继续往后执行。
听起来视乎破坏了单调性原则,但是Jet会合并watermark,可以保证从快照继续往后执行。副作用就是,重启前被认为是迟到的(late)的event,重启后不会被认为是迟到的了。
——————————————————————————————————————

public static <T,R> SupplierEx<Processor> mapP(@Nonnull
                                                        FunctionEx<? super T,? extends R> mapFn)
Type Parameters:
T - type of received item
R - type of emitted item
Parameters:
mapFn - a stateless mapping function

这个函数是无状态的,效果等同于java流式计算中的filter
————————————————————————————————————————

public static <T> SupplierEx<Processor> filterP(@Nonnull
                                                         PredicateEx<? super T> filterFn)

这个函数是无状态的,根据Predicate条件过滤数据。
————————————————————————————————————————

public static <T,R> SupplierEx<Processor> flatMapP(
                                            FunctionEx<? super T,? extends Traverser<? extends R>> flatMapFn)

这个函数是无状态的,把数据转换成Traverser。
Traverser不能为null,可以是empty。
————————————————————————————————————————

public static <T,K,S,R> SupplierEx<Processor> mapStatefulP(
												long ttl,
                                                FunctionEx<? super T,? extends K> keyFn,
                                                ToLongFunctionEx<? super T> timestampFn,
                                                Supplier<? extends S> createFn,
                                                TriFunction<? super S,? super K,? super T,? extends R> statefulMapFn,
                                                TriFunction<? super S,? super K,? super Long,? extends R> onEvictFn)

有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。如果MapFn把输入映射为了null,相当于过滤掉了这个输入数据。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
————————————————————————————————

public static <T,K,S,R> SupplierEx<Processor> flatMapStatefulP(
												long ttl,
                                                FunctionEx<? super T,? extends K> keyFn,
                                                ToLongFunctionEx<? super T> timestampFn,
                                                Supplier<? extends S> createFn,
                                                TriFunction<? super S,? super K,? super T,? extends Traverser<R>> statefulFlatMapFn,
                                                TriFunction<? super S,? super K,? super Long,? extends Traverser<R>> onEvictFn)

有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
注意这个函数跟上一个函数的区别,请跟无状态的两个对应函数对比。
————————————————————————————————————

public static <C,S,T,R> ProcessorSupplier mapUsingServiceP(ServiceFactory<C,S> serviceFactory,
                                                           BiFunctionEx<? super S,? super T,? extends R> mapFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
R - type of emitted item                                                           

对每个输入数据应用无状态函数mapFn,其中参数S是serviceFactory生成的。
————————————————————————————————————

public static <C,S,T,K,R> ProcessorSupplier mapUsingServiceAsyncP(
												ServiceFactory<C,S> serviceFactory,
                                                int maxConcurrentOps,
                                                boolean preserveOrder,
                                                FunctionEx<T,K> extractKeyFn,
                                                BiFunctionEx<? super S,? super T,CompletableFuture<R>> mapAsyncFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
K - type of key
R - type of result item
Parameters:
serviceFactory - the service factory
maxConcurrentOps - maximum number of concurrent async operations per processor
preserveOrder - whether the async responses are ordered or not
extractKeyFn - a function to extract snapshot keys. Used only if preserveOrder==false
mapAsyncFn - a stateless mapping function

future可以为null,future不为null它里面的结果也可以为null,如果是null代码此数据被过滤掉了。extractKeyFn是用来生产快照key的。
快照key的限制:如果接收到的数据跨越了分片,需要保证key相同,如果是round-robin那就随意了,可以用Object::hashCode
——————————————————————————————————

public static <C,S,T> ProcessorSupplier filterUsingServiceP(ServiceFactory<C,S> serviceFactory,
                                                            BiPredicateEx<? super S,? super T> filterFn)

serviceFactory生成一个S,然后传递给Predicate函数,过滤数据。
————————————————————————————————————

public static <C,S,T,R> ProcessorSupplier flatMapUsingServiceP(
											ServiceFactory<C,S> serviceFactory,
                                            BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)

把输入数据转换成Traverser对象,Traverser对象结尾不为null,serviceFactory生成一个S,传递给flatMapFn。
转换过程中有S对象可以用本地存储,所以虽然有快照,但是不能保证一致性,可能出异常情况。
————————————————————————————————————

public static <T> SupplierEx<Processor> sortP(Comparator<T> comparator)

把输入数据按PriorityQueue顺序排序,并在complete阶段发射出去。
————————————————————————————————————

public static SupplierEx<Processor> noopP()

不用解释了吧,有进没出的P

无状态transforms

没有中间状态存储,所有item跟其他item无关。比如下面的算子:
map
filter
flatMap
merge
mapUsingIMap
mapUsingRelicateMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingPython
hashJoin

有状态transforms

有状态的transforms累计数据,因此输出结果依赖之前的item。
在Jet中窗口只能用于流式数据(带时间),一次性的聚合操作只适合batch。
aggregate
聚合是分布式流式数据处理的基石。它对流式的item数据执行sum或者avg等算子。AggregateOperations提供了一系列的聚合方法,比如:

  • averagingLong()
  • averageDouble()
  • counting()
  • summingLong()
  • summingDouble()
  • maxBy()
  • minBy()
  • toList()
  • bottomN()
  • topN()
  • linearTrend()
  • allOf()

groupingKey

rollingAggregate
不等接收所有的item,每接收一个item就输出一个结果。所以它可以在流式处理中使用。
window
把一个无界的数据流分成一块块。
tumblingWindow

slidingWindow

sessionWindow

Early Results

distinct
略过重复
sort
只支持batch
mapStateful
是map transform的扩展,它可以保存一个可变的中间状态。可以根据这个基础算子写出任意复杂的检查模式的状态机。
比如有个流中有TRANSACTION_START和TRANSACTION_END两个item数据。这种情况用window不行,因为不确定window大小。sessionWindow也不行,因为它默认是等待到最后一次性emit所有items。

public class TransactionEvent {
    long timestamp();
    String transactionId();
    EventType type();
}

public enum EventType {
    TRANSACTION_START,
    TRANSACTION_END
}

可以这么写代码:

p.readFrom(KafkaSources.kafka(.., "transaction-events"))
 .withNativeTimestamps(0)
 .groupingKey(event -> event.getTransactionId())
 .mapStateful(MINUTES.toMillis(10),
   () -> new TransactionEvent[2],
   (state, id, event) -> {
        if (event.type() == TRANSACTION_START) {
            state[0] = event;
        } else if (event.type() == TRANSACTION_END) {
            state[1] = event;
        }
        if (state[0] != null && state[1] != null) {
            // we have both start and end events
            long duration = state[1].timestamp() - state[0].timestamp();
            return MapUtil.entry(event.transactionId(), duration);
        }
        // we don't have both events, do nothing for now.
        return null;
    },
    (state, id, currentWatermark) ->
        // if we have not received both events after 10 minutes,
        // we will emit a timeout entry
        (state[0] == null || state[1] == null)
            ? MapUtil.entry(id, TIMED_OUT)
            : null
 ).writeTo(Sinks.logger());
 类似资料:

相关阅读

相关文章

相关问答