当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ学习 RPC(10)——类Memcached应用程序

潘璞瑜
2023-12-01

2021SC@SDUSC

前言

在上一章我们实现了通过RPC实现远程键值存储的相关调用。而这一章我们会提到一个nosql数据库——Memcached。为什么会在RPC中实现一个类Memcached的应用程序?是因为ActiveJ作为一个追求极速高效的后端框架,尽可能在每一方面都能达到最高速的状态,而Memcached通过在事先规划好的内存空间中临时绶存数据库中的各类数据,以达到减少业务对数据库的直接高并发访问,从而达到提升数据库的访问性能,加速网站集群动态应用服务的能力。这无疑让ActiveJ有了实现一个类Memcached应用程序也加快速度的想法。这也是本章的开端。

Memcached

首先我们来介绍一下Memcached:
Memcached:是一个免费开源的、高性能的、具有分布式内存对象的缓存系统,它通过减轻数据库负载加速动态Web应用。我们可以把Memcached看成一个存储内容的内存条,数据库的内容本来都是放在磁盘之中,调用速度很慢,但如果放在Memcached之中后,数据库的内容不再需要进磁盘,而是在Memcached中寻找,无疑能大大加速动态web应用的加载能力,大大提速,下面列举一些Memcached的优缺点:

1.本质上就是一个内存key-value缓存;
2.协议简单,使用的是基于文本行的协议;
3.不支持数据的持久化,服务器关闭之后数据全部丢失;
4.Memcached简洁而强大,便于快速开发,上手较为容易;
5.互不通信的Memcached之间具有分布特征 ;
6,没有安全机制。

Memcached的本质也是一个远程key-value的调用,结合上一章所学,我们可以很好地把利用RPC完成key-value存储,利用ActiveJ本身快速的序列化器等,甚至可以实现一个比Memcached更快的应用程序!

Memcached client and server

下面我们就来试着创建一个基于 RPC 通信协议和 ActiveJ 技术的类似 memcached 的客户端-服务器应用程序。
我们先来看看MemcacheMultiServerModule 类:

public class MemcacheMultiServerModule extends AbstractModule {
	private MemcacheMultiServerModule() {}

	public static MemcacheMultiServerModule create() {
		return new MemcacheMultiServerModule();
	}

	@Provides
	@Worker
	Eventloop eventloop() {
		return Eventloop.create();
	}

	@Provides
	@Worker
	InetSocketAddress port(@WorkerId int workerId) {
		return new InetSocketAddress("localhost", 9000 + workerId);
	}

	@Provides
	@Worker
	RingBuffer ringBuffer(Config config) {
		return RingBuffer.create(
				config.get(ofInteger(), "memcache.buffers"),
				config.get(ofMemSize(), "memcache.bufferCapacity").toInt());
	}

	@Provides
	@Worker
	RpcServer server(Eventloop eventloop, RingBuffer storage, InetSocketAddress address) {
		return RpcServer.create(eventloop)
				.withHandler(GetRequest.class,
						request -> Promise.of(new GetResponse(storage.get(request.getKey()))))
				.withHandler(PutRequest.class,
						request -> {
							Slice slice = request.getData();
							System.out.println("Server on port #" + address.getPort() + " accepted message!");
							storage.put(request.getKey(), slice.array(), slice.offset(), slice.length());
							return Promise.of(PutResponse.INSTANCE);
						})
				.withSerializerBuilder(SerializerBuilder.create()
						.with(Slice.class, ctx -> new SerializerDefSlice()))
				.withMessageTypes(MESSAGE_TYPES)
				.withListenAddresses(address);
	}
}

我们可以观察到,MemcacheMultiServerModule 导出了一个RpcServer来处理put和get请求。MemcacheMultiServerModule 用于在单个JVM上启动多个服务器,其中包括了集成的workPools。
然后我们来具体的看一下创建的服务器MemcacheLikeServer :

public class MemcacheLikeServer extends Launcher {
	@Inject
	WorkerPool.Instances<RpcServer> instances;

	@Provides
	WorkerPool workerPool(WorkerPools workerPools) {
		return workerPools.createPool(3);
	}

	@Provides
	Config config() {
		return Config.create()
				.with("memcache.buffers", "4")
				.with("memcache.bufferCapacity", "64mb");
	}

	@Override
	protected Module getModule() {
		return ModuleBuilder.create()
				.install(ServiceGraphModule.create())
				.install(MemcacheMultiServerModule.create())
				.install(WorkerPoolModule.create())
				.build();
	}

	@Override
	protected void run() throws Exception {
		awaitShutdown();
	}

	public static void main(String[] args) throws Exception {
		MemcacheLikeServer server = new MemcacheLikeServer();
		server.launch(args);
	}
}

其实对比上一个键值存储的服务器,我们能找到很多相似的地方:如服务器重写的run()方法指明服务器同样都在等待客户端发送相关信息,否则发生阻塞;同样重写了getModule()方法提供ServiceGraphModule等等。不同在于,这次的服务器声明了config()方法,用于配置指定缓冲区的数量以及容量实现memcached。Config也将用于设置MemcacheMultiServerModule 去处理即将到来的请求所需的一切。
然后是创建客户端的MemcacheLikeClient 类:

public class MemcacheLikeClient extends Launcher {
	@Provides
	Eventloop eventloop() {
		return Eventloop.create();
	}

	@Provides
	RawMemcacheClientAdapter rawMemcacheClientAdapter(RawMemcacheClient client) {
		return new RawMemcacheClientAdapter(client);
	}

	@Provides
	Config config() {
		return Config.create()
				.with("protocol.compression", "false")
				.with("client.addresses", "localhost:9000, localhost:9001, localhost:9002");
	}

	@Inject
	RawMemcacheClientAdapter client;

	@Inject
	Eventloop eventloop;

	@Override
	protected Module getModule() {
		return ModuleBuilder.create()
				.install(ServiceGraphModule.create())
				.install(MemcacheClientModule.create())
				.install(ConfigModule.create()
						.withEffectiveConfigLogger())
				.build();
	}

	@Override
	protected void run() throws ExecutionException, InterruptedException {
		String message = "Hello, Memcached Server";

		System.out.println();
		CompletableFuture<Void> future = eventloop.submit(() ->
				sequence(
						() -> Promises.all(range(0, 25).mapToObj(i ->
								client.put(i, message))),
						() -> Promises.all(range(0, 25).mapToObj(i ->
								client.get(i).whenResult(res -> System.out.println(i + " : " + res))))));
		future.get();
		System.out.println();
	}

	public static void main(String[] args) throws Exception {
		Launcher client = new MemcacheLikeClient();
		client.launch(args);
	}
}

我们在MemcacheLikeClient 中使用的是一致性哈希策略来实现服务器分片之间的请求协议,因此在客户端中,我们需要提供每个分片对应的端口地址,即在config中定义的“9001,9002,9003”.
并且在eventloop中,请求put放入一个bytes[i]数组的当前i中的信息,并通过get从对应的单元返回。
此时客户端将对三个分片异步执行操作,我们可以收到一个无序的输出块。

Testing

在完成了源码的阅读和理解之后,我们来正式启动并运行MemcacheLikeServer启动服务器:

17:02:11.415 [main] INFO memcached.MemcacheLikeServer - === RUNNING APPLICATION

同样地,服务器阻塞等待客户端的消息。

紧接着我们打开MemcacheLikeClient 启动客户端:
首先看到的是服务器同时连接了三个客户端:

17:22:52.954 [eventloop: pool-2-thread-1] INFO io.activej.rpc.server.RpcServer - Client connected on RpcServerConnection{address=/127.0.0.1,active=1, successes=0, failures=0}
17:22:52.954 [eventloop: pool-2-thread-3] INFO io.activej.rpc.server.RpcServer - Client connected on RpcServerConnection{address=/127.0.0.1,active=1, successes=0, failures=0}
17:22:52.954 [eventloop: pool-2-thread-2] INFO io.activej.rpc.server.RpcServer - Client connected on RpcServerConnection{address=/127.0.0.1,active=1, successes=0, failures=0}

然后客户端无序地向服务器发送数据:

1 : Hello, Memcached Server
2 : Hello, Memcached Server
7 : Hello, Memcached Server
9 : Hello, Memcached Server
12 : Hello, Memcached Server
16 : Hello, Memcached Server
20 : Hello, Memcached Server
21 : Hello, Memcached Server
23 : Hello, Memcached Server
24 : Hello, Memcached Server
5 : Hello, Memcached Server
10 : Hello, Memcached Server
14 : Hello, Memcached Server
17 : Hello, Memcached Server
19 : Hello, Memcached Server
0 : Hello, Memcached Server
3 : Hello, Memcached Server
4 : Hello, Memcached Server
6 : Hello, Memcached Server
8 : Hello, Memcached Server
11 : Hello, Memcached Server
13 : Hello, Memcached Server
15 : Hello, Memcached Server
18 : Hello, Memcached Server
22 : Hello, Memcached Server

服务端接收到相关数据:

Server on port #9001 accepted message!
Server on port #9002 accepted message!
Server on port #9000 accepted message!
Server on port #9002 accepted message!
Server on port #9000 accepted message!
Server on port #9000 accepted message!
Server on port #9000 accepted message!
Server on port #9001 accepted message!
Server on port #9000 accepted message!
Server on port #9000 accepted message!
Server on port #9000 accepted message!
Server on port #9002 accepted message!
Server on port #9001 accepted message!
Server on port #9002 accepted message!
Server on port #9002 accepted message!
Server on port #9001 accepted message!
Server on port #9002 accepted message!
Server on port #9002 accepted message!
Server on port #9002 accepted message!
Server on port #9002 accepted message!
Server on port #9002 accepted message!
Server on port #9000 accepted message!
Server on port #9001 accepted message!
Server on port #9000 accepted message!
Server on port #9000 accepted message!

这里我对比了一些服务端和客户端发送的编号进行对应,发现并没有什么规律,上述的步全程都是由异步调用完成的,所以才会导致整个是一个无序的状态。

接收所有信息后,服务器再同时关闭与客户端的三个连接:

17:22:52.996 [eventloop: pool-2-thread-1] INFO io.activej.rpc.server.RpcServer - Client disconnected on RpcServerConnection{address=/127.0.0.1,active=0, successes=20, failures=0}
17:22:52.996 [eventloop: pool-2-thread-2] INFO io.activej.rpc.server.RpcServer - Client disconnected on RpcServerConnection{address=/127.0.0.1,active=0, successes=10, failures=0}
17:22:52.996 [eventloop: pool-2-thread-3] INFO io.activej.rpc.server.RpcServer - Client disconnected on RpcServerConnection{address=/127.0.0.1,active=0, successes=20, failures=0}

至此我们完成了对类Memcached服务器和客户端的应用和调试,也看到了ActiveJ提供的异步工作流的完整体现。

总结

RPC篇到这里就正式结束了,从一开始阅读源码一点点迭代不停抱怨怎么会那么困难,再到后面一点点解剖理解,揭示RPC的面纱之后,再到真正动手试着实现利用RPC的远程调用。最后的测试环节融会了之前的一切之后得出的结果实在是让人开心。而在之后,也将开启ActiveJ的fileSystem部分进行解析。

 类似资料: