com.hazelcast.jet.core.processor.Processors(简称P)
这个类实现了核心的P,这里的P对应的是Jet引擎内部的DAG图的节点。
简单来说,这些P处理的是聚合(SUM/AVG/MAX/MIN),更复杂的情况是,可能是在任意的Key上做聚合,甚至是基于事件发生的时间窗上的聚合。因此Jet开发了两类P节点:
Jet区分steaming和batch非常简单,仅仅在P的complete方法上,返回false就是steaming的,返回true就是batch的。
所有的入站数据最好是预先分好区,groupby好,这样才不会在Jet集群中产生大量跨节点的数据传输。这种方式最省内存。
-----------------
| upstream vertex |
-----------------
|
| partitioned-distributed
V
-----------
| aggregate |
-----------
第一个阶段只做accumulate,第二个阶段才做combine和finish动作。第一阶段完全在单节点本地,数据的分区,groupby只发生在第二阶段,因为第一阶段已经对数据做了聚合,到第二阶段数据传输量极小。
-----------------
| upstream vertex |
-----------------
|
| partitioned-local
V
------------
| accumulate |
------------
|
| partitioned-distributed
V
----------------
| combine/finish |
----------------
没有分区,groupby的情况下,第一阶段数据就在节点本地accumulate,然后第二阶段汇集到一个单一combine/finish节点上。
-----------------
| upstream vertex |
-----------------
|
| local, non-partitioned
V
------------
| accumulate |
------------
|
| distributed, all-to-one
V
----------------
| combine/finish | localParallelism = 1
----------------
public static <A,R> SupplierEx<Processor> aggregateP(@Nonnull
AggregateOperation<A,R> aggrOp)
Type Parameters:
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the finished result returned from aggrOp.finishAccumulationFn()
Parameters:
aggrOp - the aggregate operation to perform
消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————
public static <A,R> SupplierEx<Processor> accumulateP(@Nonnull
AggregateOperation<A,R> aggrOp)
消费入站数据,返回结果A。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————
public static <A,R> SupplierEx<Processor> combineP(@Nonnull
AggregateOperation<A,R> aggrOp)
配合accumulateP使用,消费入站数据,返回结果R,如果R是null,这个P就表示没有任何动作。
不写快照,重启任务会丢失数据,只适合做批处理。
——————————————————————————————————————
public static <K,A,R,OUT> SupplierEx<Processor> aggregateByKeyP(List<FunctionEx<?,? extends K>> keyFns,
AggregateOperation<A,R> aggrOp,
BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)
Type Parameters:
K - type of key
A - type of accumulator returned from aggrOp.createAccumulatorFn()
R - type of the result returned from aggrOp.finishAccumulationFn()
OUT - type of the item to emit
Parameters:
keyFns - functions that compute the grouping key
aggrOp - the aggregate operation
mapToOutputFn - function that takes the key and the aggregation result and returns the output item
可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————
public static <K,A> SupplierEx<Processor> accumulateByKeyP(List<FunctionEx<?,? extends K>> getKeyFns,
AggregateOperation<A,?> aggrOp)
两阶段P的第一阶段P,根据分区,groupby应用accumulate,每个groupby的key输出一个Map.Entry<K, A>。
可支持多路入站数据,每路的类型必须不同,每路要有自己的key抽取函数。aggOP里面的accumulate函数每路数据都要提供。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————
public static <K,A,R,OUT> SupplierEx<Processor> combineByKeyP(AggregateOperation<A,R> aggrOp,
BiFunctionEx<? super K,? super R,OUT> mapToOutputFn)
两阶段P的第二阶段P,从多路accumulateByKeyP接收数据并应用combine,应用函数mapToOutputFn,每个key算出一个R。
不写快照,重启任务会丢失数据,只适合做批处理。
————————————————————————————————————
public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSlidingWindowP(
List<FunctionEx<?,? extends K>> keyFns,
List<ToLongFunctionEx<?>> timestampFns,
TimestampKind timestampKind,
SlidingWindowPolicy winPolicy,
long earlyResultsPeriod,
AggregateOperation<A,? extends R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)
暂时用不到
————————————————————————————————————
public static <K,A> SupplierEx<Processor> accumulateByFrameP(
List<FunctionEx<?,? extends K>> keyFns,
List<ToLongFunctionEx<?>> timestampFns,
TimestampKind timestampKind,
SlidingWindowPolicy winPolicy,
AggregateOperation<A,?> aggrOp)
暂时用不到
—————————————————————————————————————
public static <K,A,R,OUT> SupplierEx<Processor> combineToSlidingWindowP(SlidingWindowPolicy winPolicy,
AggregateOperation<A,? extends R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)
暂时用不到
—————————————————————————————————————
public static <K,A,R,OUT> SupplierEx<Processor> aggregateToSessionWindowP(
long sessionTimeout,
long earlyResultsPeriod,
List<ToLongFunctionEx<?>> timestampFns,
List<FunctionEx<?,? extends K>> keyFns,
AggregateOperation<A,? extends R> aggrOp,
KeyedWindowResultFunction<? super K,? super R,? extends OUT> mapToOutputFn)
暂时用不到
————————————————————————————————————
public static <T> SupplierEx<Processor> insertWatermarksP(@Nonnull
EventTimePolicy<? super T> eventTimePolicy)
流处理的API来了!!!
在流里面插入watermark,wartermark由eventTimePolicy给出。
这个P会丢弃迟到的(late)数据。
最晚emit水印的数据值会被写入快照。重启所有节点,会从快照继续往后执行。
听起来视乎破坏了单调性原则,但是Jet会合并watermark,可以保证从快照继续往后执行。副作用就是,重启前被认为是迟到的(late)的event,重启后不会被认为是迟到的了。
——————————————————————————————————————
public static <T,R> SupplierEx<Processor> mapP(@Nonnull
FunctionEx<? super T,? extends R> mapFn)
Type Parameters:
T - type of received item
R - type of emitted item
Parameters:
mapFn - a stateless mapping function
这个函数是无状态的,效果等同于java流式计算中的filter
————————————————————————————————————————
public static <T> SupplierEx<Processor> filterP(@Nonnull
PredicateEx<? super T> filterFn)
这个函数是无状态的,根据Predicate条件过滤数据。
————————————————————————————————————————
public static <T,R> SupplierEx<Processor> flatMapP(
FunctionEx<? super T,? extends Traverser<? extends R>> flatMapFn)
这个函数是无状态的,把数据转换成Traverser。
Traverser不能为null,可以是empty。
————————————————————————————————————————
public static <T,K,S,R> SupplierEx<Processor> mapStatefulP(
long ttl,
FunctionEx<? super T,? extends K> keyFn,
ToLongFunctionEx<? super T> timestampFn,
Supplier<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends R> statefulMapFn,
TriFunction<? super S,? super K,? super Long,? extends R> onEvictFn)
有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。如果MapFn把输入映射为了null,相当于过滤掉了这个输入数据。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
————————————————————————————————
public static <T,K,S,R> SupplierEx<Processor> flatMapStatefulP(
long ttl,
FunctionEx<? super T,? extends K> keyFn,
ToLongFunctionEx<? super T> timestampFn,
Supplier<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> statefulFlatMapFn,
TriFunction<? super S,? super K,? super Long,? extends Traverser<R>> onEvictFn)
有状态的映射,createFn函数返回状态对象s,每个group key就有一个对应的对象s,s对象会有快照,重启不会丢。所以对象s必须可序列化。对象s的timestamp和watermark两个时间参数(watermark - ttl ??? timestamp)可以控制对象s是否要被丢失或者用watermark更新timestamp。
注意这个函数跟上一个函数的区别,请跟无状态的两个对应函数对比。
————————————————————————————————————
public static <C,S,T,R> ProcessorSupplier mapUsingServiceP(ServiceFactory<C,S> serviceFactory,
BiFunctionEx<? super S,? super T,? extends R> mapFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
R - type of emitted item
对每个输入数据应用无状态函数mapFn,其中参数S是serviceFactory生成的。
————————————————————————————————————
public static <C,S,T,K,R> ProcessorSupplier mapUsingServiceAsyncP(
ServiceFactory<C,S> serviceFactory,
int maxConcurrentOps,
boolean preserveOrder,
FunctionEx<T,K> extractKeyFn,
BiFunctionEx<? super S,? super T,CompletableFuture<R>> mapAsyncFn)
Type Parameters:
C - type of context object
S - type of service object
T - type of received item
K - type of key
R - type of result item
Parameters:
serviceFactory - the service factory
maxConcurrentOps - maximum number of concurrent async operations per processor
preserveOrder - whether the async responses are ordered or not
extractKeyFn - a function to extract snapshot keys. Used only if preserveOrder==false
mapAsyncFn - a stateless mapping function
future可以为null,future不为null它里面的结果也可以为null,如果是null代码此数据被过滤掉了。extractKeyFn是用来生产快照key的。
快照key的限制:如果接收到的数据跨越了分片,需要保证key相同,如果是round-robin那就随意了,可以用Object::hashCode
——————————————————————————————————
public static <C,S,T> ProcessorSupplier filterUsingServiceP(ServiceFactory<C,S> serviceFactory,
BiPredicateEx<? super S,? super T> filterFn)
serviceFactory生成一个S,然后传递给Predicate函数,过滤数据。
————————————————————————————————————
public static <C,S,T,R> ProcessorSupplier flatMapUsingServiceP(
ServiceFactory<C,S> serviceFactory,
BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
把输入数据转换成Traverser对象,Traverser对象结尾不为null,serviceFactory生成一个S,传递给flatMapFn。
转换过程中有S对象可以用本地存储,所以虽然有快照,但是不能保证一致性,可能出异常情况。
————————————————————————————————————
public static <T> SupplierEx<Processor> sortP(Comparator<T> comparator)
把输入数据按PriorityQueue顺序排序,并在complete阶段发射出去。
————————————————————————————————————
public static SupplierEx<Processor> noopP()
不用解释了吧,有进没出的P
没有中间状态存储,所有item跟其他item无关。比如下面的算子:
map
filter
flatMap
merge
mapUsingIMap
mapUsingRelicateMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingPython
hashJoin
有状态的transforms累计数据,因此输出结果依赖之前的item。
在Jet中窗口只能用于流式数据(带时间),一次性的聚合操作只适合batch。
aggregate
聚合是分布式流式数据处理的基石。它对流式的item数据执行sum或者avg等算子。AggregateOperations提供了一系列的聚合方法,比如:
groupingKey
略
rollingAggregate
不等接收所有的item,每接收一个item就输出一个结果。所以它可以在流式处理中使用。
window
把一个无界的数据流分成一块块。
tumblingWindow
略
slidingWindow
略
sessionWindow
略
Early Results
略
distinct
略过重复
sort
只支持batch
mapStateful
是map transform的扩展,它可以保存一个可变的中间状态。可以根据这个基础算子写出任意复杂的检查模式的状态机。
比如有个流中有TRANSACTION_START和TRANSACTION_END两个item数据。这种情况用window不行,因为不确定window大小。sessionWindow也不行,因为它默认是等待到最后一次性emit所有items。
public class TransactionEvent {
long timestamp();
String transactionId();
EventType type();
}
public enum EventType {
TRANSACTION_START,
TRANSACTION_END
}
可以这么写代码:
p.readFrom(KafkaSources.kafka(.., "transaction-events"))
.withNativeTimestamps(0)
.groupingKey(event -> event.getTransactionId())
.mapStateful(MINUTES.toMillis(10),
() -> new TransactionEvent[2],
(state, id, event) -> {
if (event.type() == TRANSACTION_START) {
state[0] = event;
} else if (event.type() == TRANSACTION_END) {
state[1] = event;
}
if (state[0] != null && state[1] != null) {
// we have both start and end events
long duration = state[1].timestamp() - state[0].timestamp();
return MapUtil.entry(event.transactionId(), duration);
}
// we don't have both events, do nothing for now.
return null;
},
(state, id, currentWatermark) ->
// if we have not received both events after 10 minutes,
// we will emit a timeout entry
(state[0] == null || state[1] == null)
? MapUtil.entry(id, TIMED_OUT)
: null
).writeTo(Sinks.logger());