Jet没有依赖ZK等外部工具软件来实现容错,它内部实现了Chandy-Lamport 分布式snapshots。一旦一个计算节点fail了,Jet会在另外一个计算节点重启Job,从snapshot中恢复处理中的状态,然后从断点恢复。
当配置Job的时候,可以设置为Exactly-Once或者At-Least-Once,这个时候Jet会用IMap来做snapshot,内部的计算状态会存储到IMap中。但是仅仅保存计算的内部状态不足以保证计算一致性,还必须要覆盖Source和Sink。
Hazelcast每隔一段时间会"举旗",所有source 顶点会保存状态,然后往下游流中写一个barrier,然后回复执行。当barrier到达P的时候,它会停下来保存snapshot,然后把barrier传递给下游节点然后回复执行。当barrier到达Sink时候,此次快照过程就完成了。
更复杂的情况下,一个P可能从多个上游顶点接收到多个barrier,这个时候有两种处理方式:
比如某个P有x,y两个上游,当x流中出现了barrier,那么x流就不再消费任何item了,直到y流中出现同样的barrier,然后做snapshot,再把barrier传递到下游P,恢复数据处理。
比如某个P有x,y两个上游,当x流中出现了barrier,那么x流继续消费item(例如:x1,x2),直到y流中出现同样的barrier,然后做snapshot,再把barrier传递到下游P,恢复数据处理。
虽然x1,x2应该是在barrier之后,但它们在barrier之前被处理过一遍了。那么当Job重启后,因为x流的x1,x2是在barrier之后,所以它们还会被重放一次,因此x1,x2被处理了两遍。
快照被保存在IMap中,默认在集群中,数据会存储2份。如果多个节点宕机,那么报错的快照可能会丢失,Jet不会在快照丢失后继续运行Job。
可以开启配置
jobConfig.setSplitBrainProtection(true);
这样的话,当发生脑裂的时候,Jet只会在节点数大于1半的集群上运行Job。
企业版高端专属
如果想在流处理中应用配置:Exactly-Once或者At-Least-Once, 那你必须帮助Hazelcast实现下面两个接口:
当job在运行的时候,Hazelcast会定时去调用接口createSnapshotFn。
当job恢复的时候,它会做如下的动作:
StreamSource<Integer> faultTolerantSource = SourceBuilder
.stream("fault-tolerant-source", processorContext -> new int[1])
.<Integer>fillBufferFn((numToEmit, buffer) ->
buffer.add(numToEmit[0]++))
.createSnapshotFn(numToEmit -> numToEmit[0])
.restoreSnapshotFn(
(numToEmit, saved) -> numToEmit[0] = saved.get(0))
.build();
快照生成函数要返回当前emit对象的编号,快照还原函数要把对象编号恢复到当前的状态。这个source是非分布式的,所以可以安全的使用:saved.get(0)。
SourceProcessors里面有一个方法convenientSourceP,它利用类ConvenientSourceP返回了一个支持精确一致性的Source处理器P。
JobConfig jobConfig = new JobConfig();
...
job = hz.getJet().newJob(pipeline, jobConfig);