2021SC@SDUSC
本章为新的开篇,主要介绍ActiveJ中的FileSystem文件系统。
ActiveJ FS 是一个轻量级的异步 Java 库,它在常见文件操作之上提供了一个微小的抽象。它提供了上载、下载、追加、列出、复制、移动、删除和其他用于操作本地、远程或分布式存储的方法。
ActiveJ FS有是什么优势呢?
异步文件系统
轻量化设计
直观的 API,由众所周知的文件操作组成
支持原子文件上传
支持使用自定义二进制协议和 HTTP 协议的客户端-服务器通信
可作为分布式文件系统集群启动
ActiveJ FS 可以在哪些地方使用:
1.本地 ActiveJ FS设计用于处理位于单台计算机上的文件系统。非常适合为您的应用程序实现本地文件存储,例如数据库存储,备份数据存储,本地日志存储等。
2.用于对远程存储的文件进行操作的客户端/服务器实现。此实现支持两种协议:
1)基于 TCP 的自定义二进制协议,开销为零。类似于 FTP,但更简单、更高效。
2)允许定义 REST API 的HTTP 协议。它还提供第三方客户端(如浏览器)访问远程 ActiveJ FS 服务器。
3.Cluster ActiveJ FS专为分布式大数据工作负载而设计。它针对处理大型不可变数据集或仅追加文件进行了优化。Cluster ActiveJ FS被故意设置为不是POSIX编译器,并且仅适用于不可变文件。通过这种方式,整个系统简单,轻量级,并且可以灵活应对服务器或网络故障。此外,这种方法可以避免主从架构的开销和可变文件的同步。借助 ActiveJ FS,您可以创建分布式、可扩展的 P2P 文件系统,并内置对重新平衡、故障转移和可扩展性的支持。
4.ActiveJ FS 适配器支持筛选、挂载、转换文件名、添加和删除前缀、子目录等。
上面我们简单介绍了一下ActiveFS能干什么,下面我们结合着ActiveFS接口定义的方法和具体实现的功能,看一下ActiveJ FS和其他的文件系统有什么区别:
ActiveFS接口代表了一个简单的文件系统,具有基本的上传、追加、下载、复制、移动、删除、信息和列表操作,以及对一组文件的复合操作。
upload()上传方法:返回被写入(或发送)到该文件的bytebufs的消费者。此方法接受任意大小的文件,但上传的文件只有在上传成功后才会出现在文件系统中。上传的文件要么存在,是完整的,要么由于某些错误不存在。允许并发上传到相同的文件名。如果有多个并发上传,最后一个成功完成的将获胜,将覆盖其他上传的结果。
public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name) {
return uploadImpl(name, identity())
.whenComplete(toLogger(logger, TRACE, "upload", name, this));
}
@Override
public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {
return uploadImpl(name, ofFixedSize(size))
.whenComplete(toLogger(logger, TRACE, "upload", name, size, this));
}
append()插入方法:返回被追加到现有文件的bytebufs的消费者。如果文件不存在,并且指定offset为0,新文件将被创建,数据将被追加到文件的开头。如果在执行append时发生错误,所有修改都会保存在文件中。append支持覆盖从特定的offset开始的文件内容。每次收到bytebuf文件后都会追加修改文件,因此可以下载仍然追加的文件。允许并发追加到同一个文件名。如果正在进行多个并发追加,它们可能会覆盖彼此的更改。而对于异常情况:如果 offset大于文件大小,promise将以IllegalOffsetException异常结束。
public Promise<ChannelConsumer<ByteBuf>> append(@NotNull String name, long offset) {
checkArgument(offset >= 0, "Offset cannot be less than 0");
return execute(
() -> {
Path path = resolve(name);
FileChannel channel;
if (offset == 0) {
channel = ensureTarget(null, path, () -> FileChannel.open(path, appendNewOptions));
if (fsyncDirectories) {
tryFsync(path.getParent());
}
} else {
channel = FileChannel.open(path, appendOptions);
}
long size = channel.size();
if (size < offset) {
throw new IllegalOffsetException("Offset " + offset + " exceeds file size " + size);
}
return channel;
})
.then(translateScalarErrorsFn(name))
.whenComplete(appendBeginPromise.recordStats())
.map(channel -> {
ChannelFileWriter writer = ChannelFileWriter.create(executor, channel)
.withOffset(offset);
if (fsyncUploads && !appendOptions.contains(SYNC)) {
writer.withForceOnClose(true);
}
return writer
.withAcknowledgement(ack -> ack
.then(translateScalarErrorsFn(name))
.whenComplete(appendFinishPromise.recordStats())
.whenComplete(toLogger(logger, TRACE, "onAppendComplete", name, offset, this)));
})
.whenComplete(toLogger(logger, TRACE, "append", name, offset, this));
}
download()下载方法:返回从文件中读取(或接收)的bytebufs的供应商。如果该文件不存在,将从服务器返回一个错误。如果文件正在追加,可以下载仍然追加的文件,例如不一致的文件。
public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {
checkArgument(offset >= 0, "offset < 0");
checkArgument(limit >= 0, "limit < 0");
return execute(
() -> {
Path path = resolve(name);
FileChannel channel = FileChannel.open(path, READ);
long size = channel.size();
if (size < offset) {
throw new IllegalOffsetException("Offset " + offset + " exceeds file size " + size);
}
return channel;
})
.map(channel -> ChannelFileReader.create(executor, channel)
.withBufferSize(readerBufferSize)
.withOffset(offset)
.withLimit(limit)
.withEndOfStream(eos -> eos
.then(translateScalarErrorsFn(name))
.whenComplete(downloadFinishPromise.recordStats())
.whenComplete(toLogger(logger, TRACE, "onDownloadComplete", name, offset, limit))))
.then(translateScalarErrorsFn(name))
.whenComplete(toLogger(logger, TRACE, "download", name, offset, limit, this))
.whenComplete(downloadBeginPromise.recordStats());
}
delete和deleteAll删除方法:
其中delete方法并不保证能删除文件,如果文件不存在的时候,也会返回promise完成成功。
deleteAll()是对每个文件调用delete视图删除,删除任何一个发生错误的时候,会使其异常完成。
public Promise<Void> delete(@NotNull String name) {
return execute(() -> deleteImpl(singleton(name)))
.then(translateScalarErrorsFn(name))
.whenComplete(toLogger(logger, TRACE, "delete", name, this))
.whenComplete(deletePromise.recordStats());
}
@Override
public Promise<Void> deleteAll(Set<String> toDelete) {
if (toDelete.isEmpty()) return Promise.complete();
return execute(() -> deleteImpl(toDelete))
.whenComplete(toLogger(logger, TRACE, "deleteAll", toDelete, this))
.whenComplete(deleteAllPromise.recordStats());
}
copy和copyAll方法:将文件从源位置复制到目标位置。源到目标的映射作为一个映射传递,其中键对应于源文件,值对应于目标文件。如果复制任何文件时发生错误,promise将异常完成。
public Promise<Void> copy(@NotNull String name, @NotNull String target) {
return execute(() -> forEachPair(singletonMap(name, target), this::doCopy))
.then(translateScalarErrorsFn())
.whenComplete(toLogger(logger, TRACE, "copy", name, target, this))
.whenComplete(copyPromise.recordStats());
}
@Override
public Promise<Void> copyAll(Map<String, String> sourceToTarget) {
checkArgument(isBijection(sourceToTarget), "Targets must be unique");
if (sourceToTarget.isEmpty()) return Promise.complete();
return execute(() -> forEachPair(sourceToTarget, this::doCopy))
.whenComplete(toLogger(logger, TRACE, "copyAll", sourceToTarget, this))
.whenComplete(copyAllPromise.recordStats());
}
move和moveAll方法:将文件从源位置移动到目标位置。实际上,调用copyAll与给定的map,然后调用deleteAll与源文件。源到目标的映射作为一个映射传递,其中键对应于源文件,值对应于目标文件。如果移动任何文件时发生错误,promise将异常完成。
public Promise<Void> move(@NotNull String name, @NotNull String target) {
return execute(() -> forEachPair(singletonMap(name, target), this::doMove))
.then(translateScalarErrorsFn())
.whenComplete(toLogger(logger, TRACE, "move", name, target, this))
.whenComplete(movePromise.recordStats());
}
@Override
public Promise<Void> moveAll(Map<String, String> sourceToTarget) {
checkArgument(isBijection(sourceToTarget), "Targets must be unique");
if (sourceToTarget.isEmpty()) return Promise.complete();
return execute(() -> forEachPair(sourceToTarget, this::doMove))
.whenComplete(toLogger(logger, TRACE, "moveAll", sourceToTarget, this))
.whenComplete(moveAllPromise.recordStats());
}
list()列举方法:列出String变量glob匹配的文件。
public Promise<Map<String, FileMetadata>> list(@NotNull String glob) {
if (glob.isEmpty()) return Promise.of(emptyMap());
return execute(
() -> {
String subdir = extractSubDir(glob);
Path subdirectory = resolve(subdir);
String subglob = glob.substring(subdir.length());
return LocalFileUtils.findMatching(tempDir, subglob, subdirectory).stream()
.collect(Collector.of(
(Supplier<Map<String, FileMetadata>>) HashMap::new,
uncheckedOf((map, path) -> {
FileMetadata metadata = toFileMetadata(path);
if (metadata != null) {
String filename = toRemoteName.apply(storage.relativize(path).toString());
map.put(filename, metadata);
}
}),
noMergeFunction())
);
})
.then(translateScalarErrorsFn())
.whenComplete(toLogger(logger, TRACE, "list", glob, this))
.whenComplete(listPromise.recordStats());
}
info()和infoAll方法:获取单个或多个文件的FileMetadata的快捷方式。
public Promise<@Nullable FileMetadata> info(@NotNull String name) {
return execute(() -> toFileMetadata(resolve(name)))
.whenComplete(toLogger(logger, TRACE, "info", name, this))
.whenComplete(infoPromise.recordStats());
}
public Promise<Map<String, @NotNull FileMetadata>> infoAll(@NotNull Set<String> names) {
if (names.isEmpty()) return Promise.of(emptyMap());
return execute(
() -> {
Map<String, FileMetadata> result = new HashMap<>();
for (String name : names) {
FileMetadata metadata = toFileMetadata(resolve(name));
if (metadata != null) {
result.put(name, metadata);
}
}
return result;
})
.whenComplete(toLogger(logger, TRACE, "infoAll", names, this))
.whenComplete(infoAllPromise.recordStats());
}
ping()方法:发送一个ping测试,由于该方法在本地ActiveFS中定义,所以总是可行的。
public Promise<Void> ping() {
return Promise.complete();
}
本章为ActiveJ FileSystem的先导,我们从本章能看到众多的Active文件系统中重写的方法,知道每一个方法所做的事帮助我们更好地理解FileSystem干了什么。下一章我们将使用这些方法中的部分,剖析FileSystem是如何运行的,并感受文件系统的强大能力。