当前位置: 首页 > 工具软件 > Hazelcast Jet > 使用案例 >

Hazelcast Jet 之Chandy-Lamport

公孙俊弼
2023-12-01

前言

Jet没有依赖ZK等外部工具软件来实现容错,它内部实现了Chandy-Lamport 分布式snapshots。一旦一个计算节点fail了,Jet会在另外一个计算节点重启Job,从snapshot中恢复处理中的状态,然后从断点恢复。

一、Jet分布式计算一致性

当配置Job的时候,可以设置为Exactly-Once或者At-Least-Once,这个时候Jet会用IMap来做snapshot,内部的计算状态会存储到IMap中。但是仅仅保存计算的内部状态不足以保证计算一致性,还必须要覆盖Source和Sink。

  1. 对于Source,需要满足下列条件之一:
  • 可回放的Source: 例如IMap或者Kafka,Source会将偏移量保存到snapshot,重启的时候会从偏移量恢复。
  • 带Ack的Source: 处理完一个item发送一个Ack。用于JMS队列。
  1. 对于Sink,需要满足下列条件之一:
  • 带事务的Sink: 当快照完成后做两阶段事务提交,比如:JMS,JDBC,Kafka。
  • 幂等写: 有些写操作是多次执行,结果一致,编码的时候用这种写操作来保证一致性。

二、Jet分布式快照

Hazelcast每隔一段时间会"举旗",所有source 顶点会保存状态,然后往下游流中写一个barrier,然后回复执行。当barrier到达P的时候,它会停下来保存snapshot,然后把barrier传递给下游节点然后回复执行。当barrier到达Sink时候,此次快照过程就完成了。
更复杂的情况下,一个P可能从多个上游顶点接收到多个barrier,这个时候有两种处理方式:

1. Exactly-Once

比如某个P有x,y两个上游,当x流中出现了barrier,那么x流就不再消费任何item了,直到y流中出现同样的barrier,然后做snapshot,再把barrier传递到下游P,恢复数据处理。

2. At-Least-Once

比如某个P有x,y两个上游,当x流中出现了barrier,那么x流继续消费item(例如:x1,x2),直到y流中出现同样的barrier,然后做snapshot,再把barrier传递到下游P,恢复数据处理。

虽然x1,x2应该是在barrier之后,但它们在barrier之前被处理过一遍了。那么当Job重启后,因为x流的x1,x2是在barrier之后,所以它们还会被重放一次,因此x1,x2被处理了两遍。

三、快照存储

快照被保存在IMap中,默认在集群中,数据会存储2份。如果多个节点宕机,那么报错的快照可能会丢失,Jet不会在快照丢失后继续运行Job。

四、集群防脑裂

可以开启配置

jobConfig.setSplitBrainProtection(true);

这样的话,当发生脑裂的时候,Jet只会在节点数大于1半的集群上运行Job。

五、磁盘存储

企业版高端专属

六、数据源实现Exactly-Once

如果想在流处理中应用配置:Exactly-Once或者At-Least-Once, 那你必须帮助Hazelcast实现下面两个接口:

  • createSnapshotFn
  • restoreSnapshotFn

当job在运行的时候,Hazelcast会定时去调用接口createSnapshotFn。
当job恢复的时候,它会做如下的动作:

  • 调用createFn来创建你的上下文对象
  • 从存储中恢复最后的快照对象
  • 把上下文对象和快照对象传递给接口restoreSnapshotFn
  • 调用fillBufferFn,它必须emit和createSnapshotFn存储的同一个对象才行。
    仔细看您会发现接收的是快照对象的列表。因为每个节点的source对象都会产生快照对象,这是分布式系统的特点。当集群重启后,节点可能不一一对应了(比如宕机了几个节点),所以恢复的时候才需要把对象列表都传递所有Processor,您自己要定义逻辑,哪个Processor处理哪部分的快照数据,例如:
StreamSource<Integer> faultTolerantSource = SourceBuilder
  .stream("fault-tolerant-source", processorContext -> new int[1])
  .<Integer>fillBufferFn((numToEmit, buffer) ->
      buffer.add(numToEmit[0]++))
  .createSnapshotFn(numToEmit -> numToEmit[0])
  .restoreSnapshotFn(
      (numToEmit, saved) -> numToEmit[0] = saved.get(0))
  .build();

快照生成函数要返回当前emit对象的编号,快照还原函数要把对象编号恢复到当前的状态。这个source是非分布式的,所以可以安全的使用:saved.get(0)。

6.1 框架内部实现

SourceProcessors里面有一个方法convenientSourceP,它利用类ConvenientSourceP返回了一个支持精确一致性的Source处理器P。

七、Jet Job实现Exactly-Once

参考文档

JobConfig jobConfig = new JobConfig();
...
job = hz.getJet().newJob(pipeline, jobConfig);
 类似资料: