pipeline只有两种stage:stream和batch,主要看它的数据源是哪种,如果是StreamSource那就用StreamStage,如果是BatchSource那就用BatchStage。也可以通过 addTimestamps来把batch模拟成无边界流。
为了避免乱序可以如下配置:
Pipeline p = Pipeline.create();
p.setPreserveOrder(true);
这样配置后,Jet会给这些消息打上同样的partitioning key。会影响性能。如果你无意中改了partitioning key就会影响事件的顺序。比如你从带有分区的kafka消费,然后用了groupingKey(这个key不是kafka的分区key),那么你就潜在的打乱了事件的顺序。
如下:
Pipeline p = Pipeline.create();
BatchSource<String> leftSource = TestSources.items("the", "quick", "brown", "fox");
BatchSource<String> rightSource = TestSources.items("jumps", "over", "the", "lazy", "dog");
BatchStage<String> left = p.readFrom(leftSource);
BatchStage<String> right = p.readFrom(rightSource);
left.merge(right)
.writeTo(Sinks.logger());
Jet在数据源和目标数据中间有一层转换层,分为两类:
无状态转换:map,filter,flatMap,hashJoin
有状态转换:aggregate,rollingAggregate,distinct,window
IMap: Hazelcast内置的,支持索引、查询、持久化,可以用在Jet的跑批和流式。
可以通过配置event journal来支持流式处理(支持Exactly-Once):
hazelcast:
map:
name_of_map:
event-journal:
enabled: true
capacity: 100000
time-to-live-seconds: 10
然后代码如下:
IMap<String, User> userCache = jet.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
.withIngestionTimestamps()
.writeTo(Sinks.logger()));
默认数据源只会输出ADDED和UPDATED事件。
详细的例子: IMap Change Stream
注意,IMap是一个分布式对象,下面的写法是有潜在bug的:
IMap<String, User> userCache = jet.getMap("users");
User user = userCache.get("user-id");
user.setAccessCount(user.getAccessCount() + 1);
userCache.put("user-id", user);
为了保证一致性,下面的写法才对:
static class IncrementEntryProcessor implements EntryProcessor<String, User, User> {
@Override
public User process(Entry<String, User> entry) {
User value = entry.getValue();
value.setAccessCount(value.getAccessCount() + 1);
return entry.setValue(value);
}
}
IMap<String, User> userCache = jet.getMap("users");
userCache.executeOnEntry("user-id", new IncrementEntryProcessor());
如下配置监听Mysql的binlog:
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
CDC只能支持At-Least-Once。数据源部分间歇性保存保存WAL offset,当发生故障或者重启的时候,offset之后的所有事件都会被重放。所以不是Exactly-Once。
因为数据库配置binlog的大小的原因,如果想回放很久以前的CDC可能发生数据丢失。所以配置合理的话,还是可以做到At-Least-Once的。
标准的Pipeline写法是用lambda,因为Pipeline是要序列化传到计算集群的,所以这些表达式都要支持序列化,又因为java.util.function是没有继承Serializable接口,因此Hazelcast的工程师实现了等同的ExXXX系列的接口,比如java.util.function.Function等同于com.hazelcast.function.FunctionEx。
除了表达式要支持Serializable,引用到的参数类也要支持Serializable。或者在buildPipeline方法内部声明内部变量。
还有一种情况,像DateTimeFormatter这种不可序列化的对象也是不能用的,但是我们可以用JDK预置的DateTimeFormatter.ISO_LOCAL_TIME在集群的目标机器上直接生成jvm对象。这个原理是,Jet集群支持java的static final,所以本质上你也可以自己创建static final变量。
最后一种情况是mapUsingService():
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
ServiceFactory<?, DateTimeFormatter> serviceFactory = nonSharedService(
pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault()));
// 这里
src.mapUsingService(serviceFactory,
(formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));
但是默认的Java序列化Serializable性能并不高,看下面的benchmark:
Strategy Number of Bytes Overhead %
java.io.Serializable 162 523
java.io.Externalizable 87 234
com.hazelcast.nio.serialization.Portable 104 300
com.hazelcast.nio.serialization.StreamSerializer 26 0
Hazelcast增加了一个新的注解@SpringAware,它的作用是:
可以操作bean属性
可以监听callback,例如ApplicationContextAware, BeanNameAware
可以操作bean的post-processing注解,例如InitializingBean, @PostConstruct
例如:
@SpringAware
public class SourceContext {
@Resource(name = "my-source-map")
IMap<String, String> sourceMap;
}
@SpringAware
public class SinkContext {
@Resource(name = "my-sink-map")
IMap<String, String> sinkMap;
}
还有一种方式是用JetSpringServiceFactories,但是需要配置xml:
<hz:spring-aware/>
代码如下:
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list("list"))
.mapUsingService(JetSpringServiceFactories.bean("my-bean"), (myBean, item) -> myBean.enrich(item))
.writeTo(Sinks.logger());
Hazelcast提供的springboot starter 暂时找不到了。