当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ学习FileSystem

奚晟
2023-12-01

2021SC@SDUSC

前言

本章为新的开篇,主要介绍ActiveJ中的FileSystem文件系统。

ActiveJ FS

ActiveJ FS 是一个轻量级的异步 Java 库,它在常见文件操作之上提供了一个微小的抽象。它提供了上载、下载、追加、列出、复制、移动、删除和其他用于操作本地、远程或分布式存储的方法。

ActiveJ FS有是什么优势呢?

异步文件系统
轻量化设计
直观的 API,由众所周知的文件操作组成
支持原子文件上传
支持使用自定义二进制协议和 HTTP 协议的客户端-服务器通信
可作为分布式文件系统集群启动

ActiveJ FS 可以在哪些地方使用:
1.本地 ActiveJ FS设计用于处理位于单台计算机上的文件系统。非常适合为您的应用程序实现本地文件存储,例如数据库存储,备份数据存储,本地日志存储等。
2.用于对远程存储的文件进行操作的客户端/服务器实现。此实现支持两种协议:
1)基于 TCP 的自定义二进制协议,开销为零。类似于 FTP,但更简单、更高效。
2)允许定义 REST API 的HTTP 协议。它还提供第三方客户端(如浏览器)访问远程 ActiveJ FS 服务器。
3.Cluster ActiveJ FS专为分布式大数据工作负载而设计。它针对处理大型不可变数据集或仅追加文件进行了优化。Cluster ActiveJ FS被故意设置为不是POSIX编译器,并且仅适用于不可变文件。通过这种方式,整个系统简单,轻量级,并且可以灵活应对服务器或网络故障。此外,这种方法可以避免主从架构的开销和可变文件的同步。借助 ActiveJ FS,您可以创建分布式、可扩展的 P2P 文件系统,并内置对重新平衡、故障转移和可扩展性的支持。
4.ActiveJ FS 适配器支持筛选、挂载、转换文件名、添加和删除前缀、子目录等。

ActiveFS接口与LocalActiveFS具体实现功能

上面我们简单介绍了一下ActiveFS能干什么,下面我们结合着ActiveFS接口定义的方法和具体实现的功能,看一下ActiveJ FS和其他的文件系统有什么区别:
ActiveFS接口代表了一个简单的文件系统,具有基本的上传、追加、下载、复制、移动、删除、信息和列表操作,以及对一组文件的复合操作。
upload()上传方法:返回被写入(或发送)到该文件的bytebufs的消费者。此方法接受任意大小的文件,但上传的文件只有在上传成功后才会出现在文件系统中。上传的文件要么存在,是完整的,要么由于某些错误不存在。允许并发上传到相同的文件名。如果有多个并发上传,最后一个成功完成的将获胜,将覆盖其他上传的结果。

	public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name) {
		return uploadImpl(name, identity())
				.whenComplete(toLogger(logger, TRACE, "upload", name, this));
	}

	@Override
	public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {
		return uploadImpl(name, ofFixedSize(size))
				.whenComplete(toLogger(logger, TRACE, "upload", name, size, this));
	}

append()插入方法:返回被追加到现有文件的bytebufs的消费者。如果文件不存在,并且指定offset为0,新文件将被创建,数据将被追加到文件的开头。如果在执行append时发生错误,所有修改都会保存在文件中。append支持覆盖从特定的offset开始的文件内容。每次收到bytebuf文件后都会追加修改文件,因此可以下载仍然追加的文件。允许并发追加到同一个文件名。如果正在进行多个并发追加,它们可能会覆盖彼此的更改。而对于异常情况:如果 offset大于文件大小,promise将以IllegalOffsetException异常结束。

public Promise<ChannelConsumer<ByteBuf>> append(@NotNull String name, long offset) {
		checkArgument(offset >= 0, "Offset cannot be less than 0");
		return execute(
				() -> {
					Path path = resolve(name);
					FileChannel channel;
					if (offset == 0) {
						channel = ensureTarget(null, path, () -> FileChannel.open(path, appendNewOptions));
						if (fsyncDirectories) {
							tryFsync(path.getParent());
						}
					} else {
						channel = FileChannel.open(path, appendOptions);
					}
					long size = channel.size();
					if (size < offset) {
						throw new IllegalOffsetException("Offset " + offset + " exceeds file size " + size);
					}
					return channel;
				})
				.then(translateScalarErrorsFn(name))
				.whenComplete(appendBeginPromise.recordStats())
				.map(channel -> {
					ChannelFileWriter writer = ChannelFileWriter.create(executor, channel)
							.withOffset(offset);
					if (fsyncUploads && !appendOptions.contains(SYNC)) {
						writer.withForceOnClose(true);
					}
					return writer
							.withAcknowledgement(ack -> ack
									.then(translateScalarErrorsFn(name))
									.whenComplete(appendFinishPromise.recordStats())
									.whenComplete(toLogger(logger, TRACE, "onAppendComplete", name, offset, this)));
				})
				.whenComplete(toLogger(logger, TRACE, "append", name, offset, this));
	}

download()下载方法:返回从文件中读取(或接收)的bytebufs的供应商。如果该文件不存在,将从服务器返回一个错误。如果文件正在追加,可以下载仍然追加的文件,例如不一致的文件。

public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {
		checkArgument(offset >= 0, "offset < 0");
		checkArgument(limit >= 0, "limit < 0");
		return execute(
				() -> {
					Path path = resolve(name);
					FileChannel channel = FileChannel.open(path, READ);
					long size = channel.size();
					if (size < offset) {
						throw new IllegalOffsetException("Offset " + offset + " exceeds file size " + size);
					}
					return channel;
				})
				.map(channel -> ChannelFileReader.create(executor, channel)
						.withBufferSize(readerBufferSize)
						.withOffset(offset)
						.withLimit(limit)
						.withEndOfStream(eos -> eos
								.then(translateScalarErrorsFn(name))
								.whenComplete(downloadFinishPromise.recordStats())
								.whenComplete(toLogger(logger, TRACE, "onDownloadComplete", name, offset, limit))))
				.then(translateScalarErrorsFn(name))
				.whenComplete(toLogger(logger, TRACE, "download", name, offset, limit, this))
				.whenComplete(downloadBeginPromise.recordStats());
	}

delete和deleteAll删除方法:
其中delete方法并不保证能删除文件,如果文件不存在的时候,也会返回promise完成成功。
deleteAll()是对每个文件调用delete视图删除,删除任何一个发生错误的时候,会使其异常完成。

public Promise<Void> delete(@NotNull String name) {
		return execute(() -> deleteImpl(singleton(name)))
				.then(translateScalarErrorsFn(name))
				.whenComplete(toLogger(logger, TRACE, "delete", name, this))
				.whenComplete(deletePromise.recordStats());
	}

	@Override
	public Promise<Void> deleteAll(Set<String> toDelete) {
		if (toDelete.isEmpty()) return Promise.complete();

		return execute(() -> deleteImpl(toDelete))
				.whenComplete(toLogger(logger, TRACE, "deleteAll", toDelete, this))
				.whenComplete(deleteAllPromise.recordStats());
	}

copy和copyAll方法:将文件从源位置复制到目标位置。源到目标的映射作为一个映射传递,其中键对应于源文件,值对应于目标文件。如果复制任何文件时发生错误,promise将异常完成。

	public Promise<Void> copy(@NotNull String name, @NotNull String target) {
		return execute(() -> forEachPair(singletonMap(name, target), this::doCopy))
				.then(translateScalarErrorsFn())
				.whenComplete(toLogger(logger, TRACE, "copy", name, target, this))
				.whenComplete(copyPromise.recordStats());
	}

	@Override
	public Promise<Void> copyAll(Map<String, String> sourceToTarget) {
		checkArgument(isBijection(sourceToTarget), "Targets must be unique");
		if (sourceToTarget.isEmpty()) return Promise.complete();

		return execute(() -> forEachPair(sourceToTarget, this::doCopy))
				.whenComplete(toLogger(logger, TRACE, "copyAll", sourceToTarget, this))
				.whenComplete(copyAllPromise.recordStats());
	}

move和moveAll方法:将文件从源位置移动到目标位置。实际上,调用copyAll与给定的map,然后调用deleteAll与源文件。源到目标的映射作为一个映射传递,其中键对应于源文件,值对应于目标文件。如果移动任何文件时发生错误,promise将异常完成。

	public Promise<Void> move(@NotNull String name, @NotNull String target) {
		return execute(() -> forEachPair(singletonMap(name, target), this::doMove))
				.then(translateScalarErrorsFn())
				.whenComplete(toLogger(logger, TRACE, "move", name, target, this))
				.whenComplete(movePromise.recordStats());
	}

	@Override
	public Promise<Void> moveAll(Map<String, String> sourceToTarget) {
		checkArgument(isBijection(sourceToTarget), "Targets must be unique");
		if (sourceToTarget.isEmpty()) return Promise.complete();

		return execute(() -> forEachPair(sourceToTarget, this::doMove))
				.whenComplete(toLogger(logger, TRACE, "moveAll", sourceToTarget, this))
				.whenComplete(moveAllPromise.recordStats());
	}

list()列举方法:列出String变量glob匹配的文件。

	public Promise<Map<String, FileMetadata>> list(@NotNull String glob) {
		if (glob.isEmpty()) return Promise.of(emptyMap());

		return execute(
				() -> {
					String subdir = extractSubDir(glob);
					Path subdirectory = resolve(subdir);
					String subglob = glob.substring(subdir.length());

					return LocalFileUtils.findMatching(tempDir, subglob, subdirectory).stream()
							.collect(Collector.of(
									(Supplier<Map<String, FileMetadata>>) HashMap::new,
									uncheckedOf((map, path) -> {
										FileMetadata metadata = toFileMetadata(path);
										if (metadata != null) {
											String filename = toRemoteName.apply(storage.relativize(path).toString());
											map.put(filename, metadata);
										}
									}),
									noMergeFunction())
							);
				})
				.then(translateScalarErrorsFn())
				.whenComplete(toLogger(logger, TRACE, "list", glob, this))
				.whenComplete(listPromise.recordStats());
	}

info()和infoAll方法:获取单个或多个文件的FileMetadata的快捷方式。

	public Promise<@Nullable FileMetadata> info(@NotNull String name) {
		return execute(() -> toFileMetadata(resolve(name)))
				.whenComplete(toLogger(logger, TRACE, "info", name, this))
				.whenComplete(infoPromise.recordStats());
	}
	public Promise<Map<String, @NotNull FileMetadata>> infoAll(@NotNull Set<String> names) {
		if (names.isEmpty()) return Promise.of(emptyMap());

		return execute(
				() -> {
					Map<String, FileMetadata> result = new HashMap<>();
					for (String name : names) {
						FileMetadata metadata = toFileMetadata(resolve(name));
						if (metadata != null) {
							result.put(name, metadata);
						}
					}
					return result;
				})
				.whenComplete(toLogger(logger, TRACE, "infoAll", names, this))
				.whenComplete(infoAllPromise.recordStats());
	}

ping()方法:发送一个ping测试,由于该方法在本地ActiveFS中定义,所以总是可行的。

	public Promise<Void> ping() {
		return Promise.complete(); 
	}

小结

本章为ActiveJ FileSystem的先导,我们从本章能看到众多的Active文件系统中重写的方法,知道每一个方法所做的事帮助我们更好地理解FileSystem干了什么。下一章我们将使用这些方法中的部分,剖析FileSystem是如何运行的,并感受文件系统的强大能力。

 类似资料: