代码在flink-table-runtime-blink模块,用户指南参考官网.
目前是旧的实现方式,将会按FLIP-95重新实现FLINK-19336
入口类FileSystemTableFactory,如何做Factory discover的可以参考之前的博文,这里就不赘述了。
构造FileSystemTableSink对象,传入相关属性参数
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
Configuration conf = new Configuration();
context.getTable().getOptions().forEach(conf::setString);
return new FileSystemTableSink(
context.getObjectIdentifier(),//connector标识符
context.isBounded(),//是否有界流
context.getTable().getSchema(),//表的schema
getPath(conf),//file 路径
context.getTable().getPartitionKeys(),//分区key
conf.get(PARTITION_DEFAULT_NAME),//默认分区名称
context.getTable().getOptions());//参数
}
FileSystemTableSink会根据DataStream构造DataStreamSink
consumeDataStream主要做几个事情:
public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
RowDataPartitionComputer computer = new RowDataPartitionComputer(
defaultPartName,
schema.getFieldNames(),
schema.getFieldDataTypes(),
partitionKeys.toArray(new String[0]));
EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.build();
FileSystemFactory fsFactory = FileSystem::get;
if (isBounded) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(computer);
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
builder.setFormatFactory(createOutputFormatFactory());
builder.setMetaStoreFactory(metaStoreFactory);
builder.setFileSystemFactory(fsFactory);
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitions);
builder.setTempPath(toStagingPath());
builder.setOutputFileConfig(outputFileConfig);
return dataStream.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
Configuration conf = new Configuration();
properties.forEach(conf::setString);
Object writer = createWriter();
TableBucketAssigner assigner = new TableBucketAssigner(computer);
TableRollingPolicy rollingPolicy = new TableRollingPolicy(
!(writer instanceof Encoder),
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
if (writer instanceof Encoder) {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
conf,
path,
partitionKeys,
tableIdentifier,
overwrite,
dataStream,
bucketsBuilder,
metaStoreFactory,
fsFactory,
conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
}
}
一般流式任务都是无界流,所以走else分支:
createStreamingSink
public static DataStreamSink<RowData> createStreamingSink(
Configuration conf,
Path path,
List<String> partitionKeys,
ObjectIdentifier tableIdentifier,
boolean overwrite,
DataStream<RowData> inputStream,
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
TableMetaStoreFactory msFactory,
FileSystemFactory fsFactory,
long rollingCheckInterval) {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
StreamingFileWriter fileWriter = new StreamingFileWriter(
rollingCheckInterval,
bucketsBuilder);
DataStream<CommitMessage> writerStream = inputStream.transform(
StreamingFileWriter.class.getSimpleName(),
TypeExtractor.createTypeInfo(CommitMessage.class),
fileWriter).setParallelism(inputStream.getParallelism());
DataStream<?> returnStream = writerStream;
// save committer when we don't need it.
if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
StreamingFileCommitter committer = new StreamingFileCommitter(
path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
returnStream = writerStream
.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
.setParallelism(1)
.setMaxParallelism(1);
}
//noinspection unchecked
return returnStream.addSink(new DiscardingSink()).setParallelism(1);
}
PS:这里有个java8的函数式接口的写法,第一次接触的同学可能会有点蒙,如果接口只有一个抽象方法,那么接口就是函数式接口,实现方式可以有很多种,最常见的就是使用匿名内部类,还有就是使用lambda或构造器引用来实现。如下,
FileSystemFactory fsFactory = FileSystem::get;
//等同于 匿名类
FileSystemFactory fileSystemFactory = new FileSystemFactory() {
public FileSystem create(URI fsUri) throws IOException {
return FileSystem.get(fsUri);
}
};
// 等同于 lambda
FileSystemFactory fileSystemFactory = uri -> FileSystem.get(uri);
数据处理在StreamingFileWriter#processElement
public void processElement(StreamRecord<RowData> element) throws Exception {
helper.onElement(
element.getValue(),
getProcessingTimeService().getCurrentProcessingTime(),
element.hasTimestamp() ? element.getTimestamp() : null,
currentWatermark);
}
在此之前会在initializeState中通过BucketsBuilder创建Buckets,并封装到StreamingFileSinkHelper中
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
// Set listener before the initialization of Buckets.
inactivePartitions = new HashSet<>();
buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
@Override
public void bucketCreated(Bucket<RowData, String> bucket) {
}
@Override
public void bucketInactive(Bucket<RowData, String> bucket) {
inactivePartitions.add(bucket.getBucketId());
}
});
helper = new StreamingFileSinkHelper<>(
buckets,
context.isRestored(),
context.getOperatorStateStore(),
getRuntimeContext().getProcessingTimeService(),
bucketCheckInterval);
currentWatermark = Long.MIN_VALUE;
}
回到processElement,跟进代码你会发现最终数据会由Bucket的write写入文件
void write(IN element, long currentTime) throws IOException {
//判断是否有inprogress的文件,如果没有则新起一个
if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
subtaskIndex, bucketId, element);
}
inProgressPart = rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
}
最终通过调用第三方包中write的方式写入文件系统,如 hadoop、hive、parquet、orc等
做cp的是snapshotState方法,主要逻辑在Buckets类中
public void snapshotState(
final long checkpointId,
final ListState<byte[]> bucketStatesContainer,
final ListState<Long> partCounterStateContainer) throws Exception {
Preconditions.checkState(
bucketWriter != null && bucketStateSerializer != null,
"sink has not been initialized");
LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
subtaskIndex, checkpointId, maxPartCounter);
bucketStatesContainer.clear();
partCounterStateContainer.clear();
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
partCounterStateContainer.add(maxPartCounter);
}
private void snapshotActiveBuckets(
final long checkpointId,
final ListState<byte[]> bucketStatesContainer) throws Exception {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
final byte[] serializedBucketState = SimpleVersionedSerialization
.writeVersionAndSerialize(bucketStateSerializer, bucketState);
bucketStatesContainer.add(serializedBucketState);
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
}
}
}
这里会对active状态的Bucket进行snapshot
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
prepareBucketForCheckpointing(checkpointId);
InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
long inProgressFileCreationTime = Long.MAX_VALUE;
if (inProgressPart != null) {
inProgressFileRecoverable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
}
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);//返回BucketState,用于序列化
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
}
closePartFile();
}
if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();//重置
}
}
核心逻辑在closePartFile中,将inprogress状态的文件关闭并由内存提交到文件系统中,得到pendingFileRecoverable对象并存储到pendingFileRecoverablesForCurrentCheckpoint列表里,为snapshot准备。
private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
if (inProgressPart != null) {
pendingFileRecoverable = inProgressPart.closeForCommit();
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
inProgressPart = null;//置位null
}
return pendingFileRecoverable;
}
写入中的文件是in progress,此时是不可以读取的,什么时候才可以被下游读取,取决于文件什么时候提交。上一步已经将数据写入文件了,但是还没有正式提交。我们知道checkpoint的几个步骤,不了解的可以参考之前的博文,在最后一步checkpointcoordinator会调用各operator的notifyCheckpointComplete方法。
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
commitUpToCheckpoint(checkpointId);
}
public void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
activeBuckets.entrySet().iterator();
LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {//由于前面一系列清理动作,这里的bucket将不会是active状态
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
}
文件的提交是在Bucket的onSuccessfulCompletionOfCheckpoint
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
checkNotNull(bucketWriter);
Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
}
it.remove();
}
cleanupInProgressFileRecoverables(checkpointId);
}
在commit方法中对文件进行重命名,使其能够被下游读取,比如hadoop的commit实现
@Override
public void commit() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
final FileStatus srcStatus;
try {
srcStatus = fs.getFileStatus(src);
}
catch (IOException e) {
throw new IOException("Cannot clean commit: Staging file does not exist.");
}
if (srcStatus.getLen() != expectedLength) {
// something was done to this file since the committer was created.
// this is not the "clean" case
throw new IOException("Cannot clean commit: File has trailing junk data.");
}
try {
fs.rename(src, dest);
}
catch (IOException e) {
throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
最后会对InprogressFile的一些状态做清理工作
private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
.entrySet().iterator();
while (it.hasNext()) {
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
// the code more readable.
final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);//除了s3,都返回false
if (LOG.isDebugEnabled() && successfullyDeleted) {
LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
}
it.remove();//清除
}
}
分区提交的触发以及提交的策略。
触发条件分为process-time和partition-time。
process time的原理是当前Checkpoint需要提交的分区和当前系统时间注册到pendingPartitions map中,在提交时判断注册时间+delay是否小于当前系统时间来确定是否需要提交分区,如果delay=0直接提交。
所以如果delay=0立即提交,如果有数据延迟的话可能导致该分区过早的提交。如果delay=分区大小,那么就是在Checkpoint间隔+delay后提交上一次Checkpoint需要提交的分区。
@Override
public void addPartition(String partition) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
}
}
@Override
public List<String> committablePartitions(long checkpointId) {
List<String> needCommit = new ArrayList<>();
long currentProcTime = procTimeService.getCurrentProcessingTime();
Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Long> entry = iter.next();
long creationTime = entry.getValue();
if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) {
needCommit.add(entry.getKey());
iter.remove();
}
}
return needCommit;
}
partition time的原理是基于watermark是否达到分区时间+delay来判断是否要提交。
@Override
public void addPartition(String partition) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.add(partition);
}
}
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));//根据path来抽取时间,比如partition='day=2020-12-01/hour=11/minute=11' 转换成 2020-12-01 11:11:00
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
读取数据相对于写入数据要简单些。
创建FileSystemTableSource对象
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
Configuration conf = new Configuration();
context.getTable().getOptions().forEach(conf::setString);
return new FileSystemTableSource(
context.getTable().getSchema(),
getPath(conf),
context.getTable().getPartitionKeys(),
conf.get(PARTITION_DEFAULT_NAME),
context.getTable().getProperties());
}
构造source function,传入input format用于读取源数据。
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
@SuppressWarnings("unchecked")
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
// Avoid using ContinuousFileMonitoringFunction
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
return source.name(explainSource());
}
在run方法中,循环读取数据,发送到下游算子
public void run(SourceContext<OUT> ctx) throws Exception {
try {
Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}
OUT nextElement = serializer.createInstance();
while (isRunning) {
format.open(splitIterator.next());
// for each element we also check if cancel
// was called by checking the isRunning flag
while (isRunning && !format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
} else {
break;
}
}
format.close();
completedSplitsCounter.inc();
if (isRunning) {
isRunning = splitIterator.hasNext();
}
}
} finally {
format.close();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).closeInputFormat();
}
isRunning = false;
}
}