前面的文章已经讲过了pomelo框架是如何进行rpc调用的,不过主要涉及到的是客户端方面,例如rpc调用的消息格式等。。。那么这篇文章就来讲讲rpc的server部分究竟是怎么运行的。。。
在开始之前,我们先来看看pomelo的配置信息:
{"id":"connector-server-1", "host":"127.0.0.1", "port":4050, "clientPort": 3050, "frontend": true},
上面是一个connector服务器的定义,frontend属性表示这个服务器会接受客户的连接(如果没有这个属性,那么表示这个服务器不会与用户进行直接的连接),这里还有比较有意思的地方,那就是配置了两个端口。。。
clientport:用于接受用户的连接
port:用于pomelo内部服务器之间的连接,也就是用于rpc用的。。。(如果服务器配置了这个端口,那么pomelo在启动服务器的时候就会加载remote组件,用于rpc的监听)
(刚开始还被这两种类型的port给弄郁闷了。不知道他们干吗要用两个端口)
好了,那么现在进入正题,开始来看这个remote组件,首先来看看它是怎么创建的。。。
//这个是用于创建rpc server的,用于接收别的服务器的远程调用的消息,并执行相应的方法
module.exports = function(app, opts) {
opts = opts || {};
opts.cacheMsg = opts.cacheMsg || false;
opts.interval = opts.interval || 30;
return new Remote(app, opts); //创建remote组件
};
/**
* Remote component class
*
* @param {Object} app current application context
* @param {Object} opts construct parameters
*/
//这里其实没有什么意思吧,无非就是将一些数据保存下来而已
var Remote = function(app, opts) {
this.app = app;
this.opts = opts;
};
好像这部分进行创建的过程并没有太大的意思吧,无非是保存一些信息。。。。真正进行创建的过程则是在于remote组件的启动。。。好吧,我们来看看:
pro.start = function(cb) {
this.opts.port = this.app.getCurServer().port; //用于接受远程服务器连接的端口进行rpc
this.remote = genRemote(this.app, this.opts); //创建remotes服务器,用接受远程调用的信息,其实这里是gateway
this.remote.start(); //启动rpc server
process.nextTick(cb);
};
这里可以分为两个主要的部分,首先是创建pomelo自定义的rpc服务器,然后在启动它。。。
那么我们先来看看这个rpc服务器是怎么创建的吧:
//创建rpc服务器
var genRemote = function(app, opts) {
opts.paths = getRemotePaths(app); //执行远程rpc的源代码放的目录
opts.context = app;
return RemoteServer.create(opts);
};
这部分代码比较少,首先是获取执行远程调用的源代码文件路径,待会用于载入它们,在上面的一篇文章中就已经知道,远程调用分为sys空间和user空间,那么在这里也是一样的,用于执行远程调用的也分为这两个部分,那么如何获取path这部分就不看了,自己看看代码很容易明白的,我么着重来看看这个server的create过程。。
//创建gateway,用于接收远程rpc的连接
module.exports.create = function(opts) {
if(!opts || !opts.port || opts.port < 0 || !opts.paths) {
throw new Error('opts.port or opts.paths invalid.');
}
var services = loadRemoteServices(opts.paths, opts.context); //创建远程服务执行的方法,这里也分为sys空间和user空间
opts.services = services; //保存刚刚创建的service
var gateway = Gateway.create(opts); //创建gateway
return gateway;
};
这个过程也主要是分成了两个部分,首先是创建服务,接下来则是创建gateway,这里service的创建其实很简单,无非就是将路径下的源文件读进来,将他们里面定义的方法保存起来,用于执行远程的调用,那么这里gateway则是用于监听端口,接受远程调用发送过来的信息。。。
//将path下面的源文件读取进来,他们里面定义的方法将会用于执行远程的调用
var loadRemoteServices = function(paths, context) {
var res = {}, item, m;
for(var i=0, l=paths.length; i<l; i++) {
item = paths[i];
m = Loader.load(item.path, context);
if(m) {
createNamespace(item.namespace, res);
for(var s in m) { //这里可以理解为遍历读进来的源文件
res[item.namespace][s] = m[s]; //用于将方法保存起来,s就是源文件的名字,也就是所谓的service的名字,然后里面还要分method的名字
}
}
}
return res;
};
var createNamespace = function(namespace, proxies) {
proxies[namespace] = proxies[namespace] || {};
};
那么接下来来看这个gateway的创建过程吧:
module.exports.create = function(opts) {
if(!opts || !opts.services) {
throw new Error('opts and opts.services should not be empty.');
}
return new Gateway(opts);
};
var Gateway = function(opts) {
EventEmitter.call(this);
this.port = opts.port || 3050;
this.started = false;
this.stoped = false;
this.acceptorFactory = opts.acceptorFactory || defaultAcceptorFactory; //用于创建acceptor的工厂
this.services = opts.services;
var self = this;
this.acceptor = this.acceptorFactory.create(opts, function(msg, cb) { //创建acceptor,用于接受远程调用的信息
dispatcher.route(msg, self.services, cb);
});
};
util.inherits(Gateway, EventEmitter);
这里gateway的创建又创建了一个acceptor,这个是干吗用的,凭名字应该就已经很清楚了吧。。无非就是用于接收远程服务器发送过来的远程调用信息。。。
好吧,那么我们来看看这个acceptor是怎么搞的吧、、、、(这里分为几种,tcp的和websocket,就直接来看websocket的了)
module.exports.create = function(opts, cb) {
return new Acceptor(opts || {}, cb);
};
var Acceptor = function(opts, cb){
EventEmitter.call(this);
this.cacheMsg = opts.cacheMsg;
this.interval = opts.interval; // flush interval in ms
this._interval = null; // interval object
this.sockets = {};
this.msgQueues = {};
this.cb = cb; //回调函数,用于处理接收到的rpc消息
};
util.inherits(Acceptor, EventEmitter);
整个创建过程还是比较的简单的,这里需要注意的一个地方是,传进来了一个回调函数,用于处理acceptor接收到的消息这个回调函数的定义在前面就已经给出来了,它的作用就是调用dispatchor的route方法,根据接收到的消息里面的namespace,service,method,args等参数的信息,调用相应的函数来处理,然后将数据返回回去。。
我们来看看这个route函数吧:
module.exports.route = function(msg, services, cb) {
var namespace = services[msg.namespace];
if(!namespace) {
utils.invokeCallback(cb, new Error('no such namespace:' + msg.namespace));
return;
}
var service = namespace[msg.service];
if(!service) {
utils.invokeCallback(cb, new Error('no such service:' + msg.service));
return;
}
var method = service[msg.method]; //获取需要调用的发那个发
if(!method) {
utils.invokeCallback(cb, new Error('no such method:' + msg.method));
return;
}
var args = msg.args.slice(0);
args.push(cb);
method.apply(service, args); //用需要调用的方法来处理数据
};
好了,到这里为止,真个rpc服务器的创建就差不太多了。。那么接下来我们来看看remote组件的启动过程吧。。也可以了解一下整个remote组件的运行。。。
这里的启动其实主要就是gateway的启动。。来看看这个启动过程吧:
//启动acceptor,监听端口
pro.start = function() {
if(this.started) {
throw new Error('gateway already start.');
}
this.started = true;
var self = this;
this.acceptor.on('error', self.emit.bind(self, 'error'));
this.acceptor.on('closed', self.emit.bind(self, 'closed'));
this.acceptor.listen(this.port); //监听端口
};
这里其实主要就是调用acceptor来监听端口。。。那么主要的工作其实还是在acceptor里面完成的。。。
来看看它的listen过程。。。
//用于监听端口
pro.listen = function(port) {
//check status
if(!!this.inited) {
utils.invokeCallback(this.cb, new Error('already inited.'));
return;
}
this.inited = true;
var self = this;
this.server = sio.listen(port); //用websocket来监听端口
this.server.set('log level', 0);
this.server.server.on('error', function(err) {
self.emit('error', err);
});
//接收到了远程服务器的连接
this.server.sockets.on('connection', function(socket) {
self.sockets[socket.id] = socket;
//当接收到了数据之后执行的操作
socket.on('message', function(pkg) {
try {
if(pkg instanceof Array) {
processMsgs(socket, self, pkg);
} else {
processMsg(socket, self, pkg);
}
} catch(e) {
// socke.io would broken if uncaugth the exception
}
});
socket.on('disconnect', function(reason) {
delete self.sockets[socket.id];
delete self.msgQueues[socket.id];
});
});
if(this.cacheMsg) {
this._interval = setInterval(function() {
flush(self);
}, this.interval);
}
};
好了,到这里为止,整个rpc服务器的运行原理就看的差不多了吧。。。。还算是比较简单。。不过感觉有地方写得稍微有一点繁琐了。。。