当前位置: 首页 > 工具软件 > Hazelcast > 使用案例 >

Hazelcast Jet Pipeline详解

祁乐邦
2023-12-01

前言

pipeline只有两种stage:stream和batch,主要看它的数据源是哪种,如果是StreamSource那就用StreamStage,如果是BatchSource那就用BatchStage。也可以通过 addTimestamps来把batch模拟成无边界流。

1. 避免事件乱序

为了避免乱序可以如下配置:

Pipeline p = Pipeline.create();
p.setPreserveOrder(true);

这样配置后,Jet会给这些消息打上同样的partitioning key。会影响性能。如果你无意中改了partitioning key就会影响事件的顺序。比如你从带有分区的kafka消费,然后用了groupingKey(这个key不是kafka的分区key),那么你就潜在的打乱了事件的顺序。

2.多数据源

如下:

Pipeline p = Pipeline.create();

BatchSource<String> leftSource = TestSources.items("the", "quick", "brown", "fox");
BatchSource<String> rightSource = TestSources.items("jumps", "over", "the", "lazy", "dog");

BatchStage<String> left = p.readFrom(leftSource);
BatchStage<String> right = p.readFrom(rightSource);

left.merge(right)
    .writeTo(Sinks.logger());

3. 转换Transforms

Jet在数据源和目标数据中间有一层转换层,分为两类:
无状态转换:map,filter,flatMap,hashJoin
有状态转换:aggregate,rollingAggregate,distinct,window

4. 分布式内存数据结构

IMap: Hazelcast内置的,支持索引、查询、持久化,可以用在Jet的跑批和流式。
可以通过配置event journal来支持流式处理(支持Exactly-Once):

hazelcast:
  map:
    name_of_map:
      event-journal:
        enabled: true
        capacity: 100000
        time-to-live-seconds: 10

然后代码如下:

IMap<String, User> userCache = jet.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
 .withIngestionTimestamps()
 .writeTo(Sinks.logger()));

默认数据源只会输出ADDED和UPDATED事件。
详细的例子: IMap Change Stream
注意,IMap是一个分布式对象,下面的写法是有潜在bug的:

IMap<String, User> userCache = jet.getMap("users");
User user = userCache.get("user-id");
user.setAccessCount(user.getAccessCount() + 1);
userCache.put("user-id", user);

为了保证一致性,下面的写法才对:

static class IncrementEntryProcessor implements EntryProcessor<String, User, User> {
    @Override
    public User process(Entry<String, User> entry) {
        User value = entry.getValue();
        value.setAccessCount(value.getAccessCount() + 1);
        return entry.setValue(value);
    }
}

IMap<String, User> userCache = jet.getMap("users");
userCache.executeOnEntry("user-id", new IncrementEntryProcessor());

5. CDC

如下配置监听Mysql的binlog:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    MySqlCdcSources.mysql("customers")
            .setDatabaseAddress("127.0.0.1")
            .setDatabasePort(3306)
            .setDatabaseUser("debezium")
            .setDatabasePassword("dbz")
            .setClusterName("dbserver1")
            .setDatabaseWhitelist("inventory")
            .setTableWhitelist("inventory.customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());

CDC只能支持At-Least-Once。数据源部分间歇性保存保存WAL offset,当发生故障或者重启的时候,offset之后的所有事件都会被重放。所以不是Exactly-Once。
因为数据库配置binlog的大小的原因,如果想回放很久以前的CDC可能发生数据丢失。所以配置合理的话,还是可以做到At-Least-Once的。

6. Pipeline的序列化

标准的Pipeline写法是用lambda,因为Pipeline是要序列化传到计算集群的,所以这些表达式都要支持序列化,又因为java.util.function是没有继承Serializable接口,因此Hazelcast的工程师实现了等同的ExXXX系列的接口,比如java.util.function.Function等同于com.hazelcast.function.FunctionEx。
除了表达式要支持Serializable,引用到的参数类也要支持Serializable。或者在buildPipeline方法内部声明内部变量。
还有一种情况,像DateTimeFormatter这种不可序列化的对象也是不能用的,但是我们可以用JDK预置的DateTimeFormatter.ISO_LOCAL_TIME在集群的目标机器上直接生成jvm对象。这个原理是,Jet集群支持java的static final,所以本质上你也可以自己创建static final变量。
最后一种情况是mapUsingService():

Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
ServiceFactory<?, DateTimeFormatter> serviceFactory = nonSharedService(
        pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                              .withZone(ZoneId.systemDefault()));
// 这里
src.mapUsingService(serviceFactory,
        (formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

但是默认的Java序列化Serializable性能并不高,看下面的benchmark:

Strategy                                        Number of Bytes  Overhead %
java.io.Serializable                                        162         523
java.io.Externalizable                                       87         234
com.hazelcast.nio.serialization.Portable                    104         300
com.hazelcast.nio.serialization.StreamSerializer             26           0

7. spring集成

Hazelcast增加了一个新的注解@SpringAware,它的作用是:
可以操作bean属性
可以监听callback,例如ApplicationContextAware, BeanNameAware
可以操作bean的post-processing注解,例如InitializingBean, @PostConstruct
例如:

@SpringAware
public class SourceContext {

    @Resource(name = "my-source-map")
    IMap<String, String> sourceMap;
}

@SpringAware
public class SinkContext {

    @Resource(name = "my-sink-map")
    IMap<String, String> sinkMap;
}

还有一种方式是用JetSpringServiceFactories,但是需要配置xml:

<hz:spring-aware/>

代码如下:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list("list"))
        .mapUsingService(JetSpringServiceFactories.bean("my-bean"), (myBean, item) -> myBean.enrich(item))
        .writeTo(Sinks.logger());

8. springboot集成

Hazelcast提供的springboot starter 暂时找不到了。

 类似资料: