Flink源码阅读之FileSystem Connector

岳风畔
2023-12-01

代码在flink-table-runtime-blink模块,用户指南参考官网.

目前是旧的实现方式,将会按FLIP-95重新实现FLINK-19336

入口类FileSystemTableFactory,如何做Factory discover的可以参考之前的博文,这里就不赘述了。

Sink

构造FileSystemTableSink对象,传入相关属性参数

public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
		Configuration conf = new Configuration();
		context.getTable().getOptions().forEach(conf::setString);

		return new FileSystemTableSink(
				context.getObjectIdentifier(),//connector标识符
				context.isBounded(),//是否有界流
				context.getTable().getSchema(),//表的schema
				getPath(conf),//file 路径
				context.getTable().getPartitionKeys(),//分区key
				conf.get(PARTITION_DEFAULT_NAME),//默认分区名称
				context.getTable().getOptions());//参数
	}

FileSystemTableSink会根据DataStream构造DataStreamSink

consumeDataStream主要做几个事情:

  1. 构造RowDataPartitionComputer,将分区字段和非分区字段index和type分开。
  2. EmptyMetaStoreFactory空的metastore实现。
  3. UUID生成文件前缀
  4. 构造FileSystemFactory的实现
  5. 根据是否有界流走不同分支处理
public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
		RowDataPartitionComputer computer = new RowDataPartitionComputer(
				defaultPartName,
				schema.getFieldNames(),
				schema.getFieldDataTypes(),
				partitionKeys.toArray(new String[0]));

		EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
		OutputFileConfig outputFileConfig = OutputFileConfig.builder()
				.withPartPrefix("part-" + UUID.randomUUID().toString())
				.build();
		FileSystemFactory fsFactory = FileSystem::get;

		if (isBounded) {
			FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
			builder.setPartitionComputer(computer);
			builder.setDynamicGrouped(dynamicGrouping);
			builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
			builder.setFormatFactory(createOutputFormatFactory());
			builder.setMetaStoreFactory(metaStoreFactory);
			builder.setFileSystemFactory(fsFactory);
			builder.setOverwrite(overwrite);
			builder.setStaticPartitions(staticPartitions);
			builder.setTempPath(toStagingPath());
			builder.setOutputFileConfig(outputFileConfig);
			return dataStream.writeUsingOutputFormat(builder.build())
					.setParallelism(dataStream.getParallelism());
		} else {
			Configuration conf = new Configuration();
			properties.forEach(conf::setString);
			Object writer = createWriter();
			TableBucketAssigner assigner = new TableBucketAssigner(computer);
			TableRollingPolicy rollingPolicy = new TableRollingPolicy(
					!(writer instanceof Encoder),
					conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
					conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

			BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
			if (writer instanceof Encoder) {
				//noinspection unchecked
				bucketsBuilder = StreamingFileSink.forRowFormat(
						path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
						.withBucketAssigner(assigner)
						.withOutputFileConfig(outputFileConfig)
						.withRollingPolicy(rollingPolicy);
			} else {
				//noinspection unchecked
				bucketsBuilder = StreamingFileSink.forBulkFormat(
						path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
						.withBucketAssigner(assigner)
						.withOutputFileConfig(outputFileConfig)
						.withRollingPolicy(rollingPolicy);
			}
			return createStreamingSink(
					conf,
					path,
					partitionKeys,
					tableIdentifier,
					overwrite,
					dataStream,
					bucketsBuilder,
					metaStoreFactory,
					fsFactory,
					conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
		}
	}

一般流式任务都是无界流,所以走else分支:

  1. 根据format类型创建Writer对象,比如parquet,是从BulkWriter创建来的
  2. 用TableBucketAssigner包装RowDataPartitionComputer
  3. 构造TableRollingPolicy,用于文件的生成策略,BulkWriter是根据checkpoint的执行来生成文件
  4. 构造BucketsBuilder对象

createStreamingSink

  1. BucketsBuilder包装成StreamingFileWriter,这是个operator,继承了AbstractStreamOperator
  2. 在inputStream后增加了一个operator,主要处理逻辑在这个operator里面
  3. 如果配置了sink.partition-commit.policy.kind,则会进行commit处理,比如维护partition到metastore或者生成_success文件,同样也是增加了一个operator
  4. 最后通过一个DiscardingSink function将数据丢弃,因为数据在上面operator已经处理过了
public static DataStreamSink<RowData> createStreamingSink(
			Configuration conf,
			Path path,
			List<String> partitionKeys,
			ObjectIdentifier tableIdentifier,
			boolean overwrite,
			DataStream<RowData> inputStream,
			BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
			TableMetaStoreFactory msFactory,
			FileSystemFactory fsFactory,
			long rollingCheckInterval) {
		if (overwrite) {
			throw new IllegalStateException("Streaming mode not support overwrite.");
		}

		StreamingFileWriter fileWriter = new StreamingFileWriter(
				rollingCheckInterval,
				bucketsBuilder);
		DataStream<CommitMessage> writerStream = inputStream.transform(
				StreamingFileWriter.class.getSimpleName(),
				TypeExtractor.createTypeInfo(CommitMessage.class),
				fileWriter).setParallelism(inputStream.getParallelism());

		DataStream<?> returnStream = writerStream;

		// save committer when we don't need it.
		if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
			StreamingFileCommitter committer = new StreamingFileCommitter(
					path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
			returnStream = writerStream
					.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
					.setParallelism(1)
					.setMaxParallelism(1);
		}
		//noinspection unchecked
		return returnStream.addSink(new DiscardingSink()).setParallelism(1);
	}

PS:这里有个java8的函数式接口的写法,第一次接触的同学可能会有点蒙,如果接口只有一个抽象方法,那么接口就是函数式接口,实现方式可以有很多种,最常见的就是使用匿名内部类,还有就是使用lambda或构造器引用来实现。如下,

FileSystemFactory fsFactory = FileSystem::get;
//等同于 匿名类
		FileSystemFactory fileSystemFactory = new FileSystemFactory() {
			public FileSystem create(URI fsUri) throws IOException {
				return FileSystem.get(fsUri);
			}
		};

//		等同于 lambda
		FileSystemFactory fileSystemFactory = uri -> FileSystem.get(uri);

数据写入filesystem

数据处理在StreamingFileWriter#processElement

public void processElement(StreamRecord<RowData> element) throws Exception {
		helper.onElement(
				element.getValue(),
				getProcessingTimeService().getCurrentProcessingTime(),
				element.hasTimestamp() ? element.getTimestamp() : null,
				currentWatermark);
	}

在此之前会在initializeState中通过BucketsBuilder创建Buckets,并封装到StreamingFileSinkHelper中

@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);
		buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());

		// Set listener before the initialization of Buckets.
		inactivePartitions = new HashSet<>();
		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
			@Override
			public void bucketCreated(Bucket<RowData, String> bucket) {
			}

			@Override
			public void bucketInactive(Bucket<RowData, String> bucket) {
				inactivePartitions.add(bucket.getBucketId());
			}
		});

		helper = new StreamingFileSinkHelper<>(
				buckets,
				context.isRestored(),
				context.getOperatorStateStore(),
				getRuntimeContext().getProcessingTimeService(),
				bucketCheckInterval);
		currentWatermark = Long.MIN_VALUE;
	}

回到processElement,跟进代码你会发现最终数据会由Bucket的write写入文件

void write(IN element, long currentTime) throws IOException {
  //判断是否有inprogress的文件,如果没有则新起一个
		if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

			if (LOG.isDebugEnabled()) {
				LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
						subtaskIndex, bucketId, element);
			}

			inProgressPart = rollPartFile(currentTime);
		}
		inProgressPart.write(element, currentTime);
	}

最终通过调用第三方包中write的方式写入文件系统,如 hadoop、hive、parquet、orc等

checkpoint

做cp的是snapshotState方法,主要逻辑在Buckets类中

public void snapshotState(
			final long checkpointId,
			final ListState<byte[]> bucketStatesContainer,
			final ListState<Long> partCounterStateContainer) throws Exception {

		Preconditions.checkState(
			bucketWriter != null && bucketStateSerializer != null,
				"sink has not been initialized");

		LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
				subtaskIndex, checkpointId, maxPartCounter);

		bucketStatesContainer.clear();
		partCounterStateContainer.clear();

		snapshotActiveBuckets(checkpointId, bucketStatesContainer);
		partCounterStateContainer.add(maxPartCounter);
	}
	
private void snapshotActiveBuckets(
			final long checkpointId,
			final ListState<byte[]> bucketStatesContainer) throws Exception {

		for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
			final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);

			final byte[] serializedBucketState = SimpleVersionedSerialization
					.writeVersionAndSerialize(bucketStateSerializer, bucketState);

			bucketStatesContainer.add(serializedBucketState);

			if (LOG.isDebugEnabled()) {
				LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
			}
		}
	}

这里会对active状态的Bucket进行snapshot

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
		prepareBucketForCheckpointing(checkpointId);

		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
		long inProgressFileCreationTime = Long.MAX_VALUE;

		if (inProgressPart != null) {
			inProgressFileRecoverable = inProgressPart.persist();
			inProgressFileCreationTime = inProgressPart.getCreationTime();
			this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
		}

		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);//返回BucketState,用于序列化
	}
	
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
		if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
			}
			closePartFile();
		}

		if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
			pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
			pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();//重置
		}
	}

核心逻辑在closePartFile中,将inprogress状态的文件关闭并由内存提交到文件系统中,得到pendingFileRecoverable对象并存储到pendingFileRecoverablesForCurrentCheckpoint列表里,为snapshot准备。

private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
		InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
		if (inProgressPart != null) {
			pendingFileRecoverable = inProgressPart.closeForCommit();
			pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
			inProgressPart = null;//置位null
		}
		return pendingFileRecoverable;
	}

写入中的文件是in progress,此时是不可以读取的,什么时候才可以被下游读取,取决于文件什么时候提交。上一步已经将数据写入文件了,但是还没有正式提交。我们知道checkpoint的几个步骤,不了解的可以参考之前的博文,在最后一步checkpointcoordinator会调用各operator的notifyCheckpointComplete方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception {
		super.notifyCheckpointComplete(checkpointId);
		commitUpToCheckpoint(checkpointId);
	}

public void commitUpToCheckpoint(final long checkpointId) throws IOException {
		final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
				activeBuckets.entrySet().iterator();

		LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);

		while (activeBucketIt.hasNext()) {
			final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
			bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

			if (!bucket.isActive()) {//由于前面一系列清理动作,这里的bucket将不会是active状态
				// We've dealt with all the pending files and the writer for this bucket is not currently open.
				// Therefore this bucket is currently inactive and we can remove it from our state.
				activeBucketIt.remove();
				notifyBucketInactive(bucket);
			}
		}
	}

文件的提交是在Bucket的onSuccessfulCompletionOfCheckpoint

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
		checkNotNull(bucketWriter);

		Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
				pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
						.entrySet().iterator();

		while (it.hasNext()) {
			Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();

			for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
				bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
			}
			it.remove();
		}

		cleanupInProgressFileRecoverables(checkpointId);
	}

在commit方法中对文件进行重命名,使其能够被下游读取,比如hadoop的commit实现

@Override
		public void commit() throws IOException {
			final Path src = recoverable.tempFile();
			final Path dest = recoverable.targetFile();
			final long expectedLength = recoverable.offset();

			final FileStatus srcStatus;
			try {
				srcStatus = fs.getFileStatus(src);
			}
			catch (IOException e) {
				throw new IOException("Cannot clean commit: Staging file does not exist.");
			}

			if (srcStatus.getLen() != expectedLength) {
				// something was done to this file since the committer was created.
				// this is not the "clean" case
				throw new IOException("Cannot clean commit: File has trailing junk data.");
			}

			try {
				fs.rename(src, dest);
			}
			catch (IOException e) {
				throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
			}
		}

最后会对InprogressFile的一些状态做清理工作

private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
		Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
				inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
						.entrySet().iterator();

		while (it.hasNext()) {
			final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();

			// this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
			// list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
			// the code more readable.

			final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);//除了s3,都返回false
			if (LOG.isDebugEnabled() && successfullyDeleted) {
				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
			}
			it.remove();//清除
		}
	}

partition commit

分区提交的触发以及提交的策略。
触发条件分为process-time和partition-time。
process time的原理是当前Checkpoint需要提交的分区和当前系统时间注册到pendingPartitions map中,在提交时判断注册时间+delay是否小于当前系统时间来确定是否需要提交分区,如果delay=0直接提交。
所以如果delay=0立即提交,如果有数据延迟的话可能导致该分区过早的提交。如果delay=分区大小,那么就是在Checkpoint间隔+delay后提交上一次Checkpoint需要提交的分区。

@Override
	public void addPartition(String partition) {
		if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
			this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
		}
	}

	@Override
	public List<String> committablePartitions(long checkpointId) {
		List<String> needCommit = new ArrayList<>();
		long currentProcTime = procTimeService.getCurrentProcessingTime();
		Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
		while (iter.hasNext()) {
			Map.Entry<String, Long> entry = iter.next();
			long creationTime = entry.getValue();
			if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) {
				needCommit.add(entry.getKey());
				iter.remove();
			}
		}
		return needCommit;
	}

partition time的原理是基于watermark是否达到分区时间+delay来判断是否要提交。

@Override
	public void addPartition(String partition) {
		if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
			this.pendingPartitions.add(partition);
		}
	}

	@Override
	public List<String> committablePartitions(long checkpointId) {
		if (!watermarks.containsKey(checkpointId)) {
			throw new IllegalArgumentException(String.format(
					"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
					checkpointId, watermarks));
		}

		long watermark = watermarks.get(checkpointId);
		watermarks.headMap(checkpointId, true).clear();

		List<String> needCommit = new ArrayList<>();
		Iterator<String> iter = pendingPartitions.iterator();
		while (iter.hasNext()) {
			String partition = iter.next();
			LocalDateTime partTime = extractor.extract(
					partitionKeys, extractPartitionValues(new Path(partition)));//根据path来抽取时间,比如partition='day=2020-12-01/hour=11/minute=11' 转换成 2020-12-01 11:11:00
			if (watermark > toMills(partTime) + commitDelay) {
				needCommit.add(partition);
				iter.remove();
			}
		}
		return needCommit;
	}

Source

读取数据相对于写入数据要简单些。

创建FileSystemTableSource对象

public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
		Configuration conf = new Configuration();
		context.getTable().getOptions().forEach(conf::setString);

		return new FileSystemTableSource(
				context.getTable().getSchema(),
				getPath(conf),
				context.getTable().getPartitionKeys(),
				conf.get(PARTITION_DEFAULT_NAME),
				context.getTable().getProperties());
	}

构造source function,传入input format用于读取源数据。

public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
   @SuppressWarnings("unchecked")
   TypeInformation<RowData> typeInfo =
         (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
   // Avoid using ContinuousFileMonitoringFunction
   InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
   DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
   return source.name(explainSource());
}

在run方法中,循环读取数据,发送到下游算子

public void run(SourceContext<OUT> ctx) throws Exception {
		try {

			Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
			if (isRunning && format instanceof RichInputFormat) {
				((RichInputFormat) format).openInputFormat();
			}

			OUT nextElement = serializer.createInstance();
			while (isRunning) {
				format.open(splitIterator.next());

				// for each element we also check if cancel
				// was called by checking the isRunning flag

				while (isRunning && !format.reachedEnd()) {
					nextElement = format.nextRecord(nextElement);
					if (nextElement != null) {
						ctx.collect(nextElement);
					} else {
						break;
					}
				}
				format.close();
				completedSplitsCounter.inc();

				if (isRunning) {
					isRunning = splitIterator.hasNext();
				}
			}
		} finally {
			format.close();
			if (format instanceof RichInputFormat) {
				((RichInputFormat) format).closeInputFormat();
			}
			isRunning = false;
		}
	}
 类似资料: