2021SC@SDUSC
在上一章我们解析了ActiveJ FileSystem中ActiveFS的众多方法接口以及其对应在LocalActiveFS中的实现,让我们对ActiveJ FileSystem能干的事有了大体的了解。而接下来的章节,我们将根据具体的实例试着理解FileSystem的运行机制。本次通过upload和download两种方式来探究ActiveJ FileSystem的众多功能例。
我们通过继承SimpleTcpServerLauncher实现ServerSetupExample ,可以允许简单地设置应用程序,且ServerSetupExample 只需要覆盖几个启动器方法。
public class ServerSetupExample extends SimpleTcpServerLauncher {
private Path storage;
@Override
protected void onInit(Injector injector) throws Exception {
storage = Files.createTempDirectory("server_storage");
}
@Override
protected Config createConfig() {
return super.createConfig()
.with("activefs.path", storage.toString())
.with("activefs.listenAddresses", "6732");
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
Launcher launcher = new ServerSetupExample();
launcher.launch(args);
}
}
重写onInit()方法,本方法在应用程序启动之前就进行运行,负责进行初始化storage。
重写createConfig ()方法,配置相关的参数保证服务器能正常运行。
重写run()方法,和之前RPC中的一样,等待客户端的消息传达,否则阻塞。
可以看到FileUploadExample 类同样是继承了Launcher启动器:
public final class FileUploadExample extends Launcher {
private static final int SERVER_PORT = 6732;
private static final String FILE_NAME = "example.txt";
private static final String EXAMPLE_TEXT = "example text";
private Path clientFile;
@Override
protected void onInit(Injector injector) throws Exception {
clientFile = Files.createTempFile("example", ".txt");
Files.write(clientFile, EXAMPLE_TEXT.getBytes());
}
@Inject
private RemoteActiveFs client;
@Inject
private Eventloop eventloop;
@Provides
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
RemoteActiveFs remoteFsClient(Eventloop eventloop) {
return RemoteActiveFs.create(eventloop, new InetSocketAddress(SERVER_PORT));
}
@Override
protected Module getModule() {
return ServiceGraphModule.create();
}
//[START EXAMPLE]
@Override
protected void run() throws Exception {
ExecutorService executor = newSingleThreadExecutor();
CompletableFuture<Void> future = eventloop.submit(() ->
// consumer result here is a marker of it being successfully uploaded
ChannelFileReader.open(executor, clientFile)
.then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))
.whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))
);
try {
future.get();
} finally {
executor.shutdown();
}
}
//[END EXAMPLE]
public static void main(String[] args) throws Exception {
FileUploadExample example = new FileUploadExample();
example.launch(args);
}
}
这里我们覆盖了onInit()、getModule()以及run()方法,在这里我们创建了一个example.txt,并且我们可以看到run中同样是使用了CompletableFuture类进行eventloop的submit操作,将文件上传给了服务器。
通过调用之前说过的upload方法,以流的形式上传。
同样可以看到FileDownloadExample 也是继承了Launcher :
public final class FileDownloadExample extends Launcher {
private static final int SERVER_PORT = 6732;
private static final String REQUIRED_FILE = "example.txt";
private static final String DOWNLOADED_FILE = "downloaded_example.txt";
private Path clientStorage;
@Override
protected void onInit(Injector injector) throws Exception {
clientStorage = Files.createTempDirectory("client_storage");
}
@Inject
private RemoteActiveFs client;
@Inject
private Eventloop eventloop;
@Provides
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
RemoteActiveFs remoteFsClient(Eventloop eventloop) {
return RemoteActiveFs.create(eventloop, new InetSocketAddress(SERVER_PORT));
}
@Override
protected Module getModule() {
return ServiceGraphModule.create();
}
//[START EXAMPLE]
@Override
protected void run() throws Exception {
ExecutorService executor = newSingleThreadExecutor();
CompletableFuture<Void> future = eventloop.submit(() ->
ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))
.streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))
.whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",
REQUIRED_FILE, clientStorage))
);
try {
future.get();
} finally {
executor.shutdown();
}
}
//[END EXAMPLE]
public static void main(String[] args) throws Exception {
FileDownloadExample example = new FileDownloadExample();
example.launch(args);
}
}
这里我们覆盖了onInit()、getModule()以及run()方法,在这里我们想要从服务器中得到我们不久前上传的example.txt,并且我们可以看到run中同样是使用了CompletableFuture类进行eventloop的submit操作,向服务器发送一个下载文件的请求,并指明下载到的文件名称。
上面的逻辑都很清晰,如何上传如何沟通下载,其实都很明了,重点是是否太过简单了?当我们想要拓展实现ActiveFs功能的时候怎么办呢?这时候我们就可以用到ActiveJ提供的Decorator装饰器模式。下面我们结合着具体的实例DecoratedActiveFsExample来看一看。
public class DecoratedActiveFsExample extends ServerSetupExample {
//[START OVERRIDE]
@Override
protected Module getOverrideModule() {
return new AbstractModule() {
@Eager
@Provides
ActiveFsServer activeFsServer(Eventloop eventloop, @Named("decorated") ActiveFs decoratedFs, Config config) {
return ActiveFsServer.create(eventloop, decoratedFs)
.withInitializer(ofActiveFsServer(config.getChild("activefs")));
}
@Provides
@Named("decorated")
ActiveFs decoratedActiveFs(ActiveFs fs) {
return new LoggingActiveFs(fs);
}
};
}
//[END OVERRIDE]
public static void main(String[] args) throws Exception {
Launcher launcher = new DecoratedActiveFsExample();
launcher.launch(args);
}
//[START WRAPPER]
private static final class LoggingActiveFs extends ForwardingActiveFs {
private static final Logger logger = LoggerFactory.getLogger(LoggingActiveFs.class);
public LoggingActiveFs(ActiveFs peer) {
super(peer);
}
@Override
public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {
return super.upload(name)
.map(consumer -> {
logger.info("Starting upload of file: {}. File size is {} bytes", name, size);
return consumer
.withAcknowledgement(ack -> ack
.whenResult(() -> logger.info("Upload of file {} finished", name)));
});
}
@Override
public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {
return super.download(name, offset, limit)
.map(supplier -> {
logger.info("Starting downloading file: {}", name);
return supplier
.withEndOfStream(eos -> eos
.whenResult(() -> logger.info("Download of file {} finished", name)));
});
}
}
//[END WRAPPER]
}
重点为其对于getOverrideModule方法的重写,我们需要覆盖要传递修饰的绑定,而不再是原始绑定。这里DecoratedActiveFsExample 通过为文件上传和下载添加额外的日志记录来实现装饰"ActiveFs"。
DecoratedActiveFsExample 将装饰统一包装在LoggingActiveFs 中,我们可以请求一个原始ActiveFs并返回一个被装饰的DecoratedActiveFs。
在DecoratedActiveFsExample ,我们重写了我们想要修饰(下载、上传)的方法,并在上传/下载开始和结束时添加自定义的日志消息。于是我们可以试着运行,并看到了以下信息:
INFO Starting upload of file: example.txt. File size is 12 bytes
INFO Upload of file example.txt finished
INFO Starting downloading file: example.txt
INFO Download of file example.txt finished
本次就结合着FileSystem给的例子对其中的方法进行了理解,明白FileSystem是如何运行的,其本质和之前学的RPC服务器和客户端的连接其实相差不大,只是此时已经改为了通过channelfilereader等的文件流传输。也通过拓展明白了装饰的作用,为之后实际应用FileSystem做一些拓展功能做好了铺垫。