项目地址: https://github.com/NetEase/chatofpomelo-websocket.git
分支:tutorial-protobuf
如果发现有错误的地方,请在评论中及时指出,谢谢
如果此服务器配置包含port端口,则加载remote组件。
进而启动remote组件,方法如下:
pro.start = function (cb) {
this.opts.port = this.app.getCurServer().port;
this.remote = genRemote(this.app, this.opts); // 初始化remote
this.remote.start();
process.nextTick(cb);
};
var genRemote = function (app, opts) {
opts.paths = getRemotePaths(app); // 加载不同的remote路径
opts.context = app;
if (!!opts.rpcServer) { // 根据是否有rpcServer配置进行不同的初始化
return opts.rpcServer.create(opts);
} else {
return RemoteServer.create(opts);
}
};
var getRemotePaths = function (app) {
var paths = [];
var role;
// master server should not come here
if (app.isFrontend()) {
role = 'frontend';
} else {
role = 'backend';
}
var sysPath = pathUtil.getSysRemotePath(role), serverType = app.getServerType();
if (fs.existsSync(sysPath)) {
paths.push(pathUtil.remotePathRecord('sys', serverType, sysPath));
}
var userPath = pathUtil.getUserRemotePath(app.getBase(), serverType);
if (fs.existsSync(userPath)) {
paths.push(pathUtil.remotePathRecord('user', serverType, userPath));
}
return paths;
};
backend paths:
[
{
namespace: 'sys',
serverType: 'chat',
path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/backend'
},
{
namespace: 'user',
serverType: 'chat',
path: '/chatofpomelo-websocket/game-server/app/servers/chat/remote'
}
]
frontend paths:
[
{
namespace: 'sys',
serverType: 'connector',
path: /chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/frontend'
}
]
通过上面的代码,我们得知,remote根据服务器是否为前端服务器进行不同的路径匹配,remote的命名空间分为sys系统级和user用户级两种规则,后续调用remoteServer.create
方法为this.remote
进行初始化赋值,最后调用this.remote.start()
启动remote模块。
module.exports.create = function (opts) {
var services = loadRemoteServices(opts.paths, opts.context); // 根据paths加载相应路径的模块,并进行初始化操作
opts.services = services;
var gateway = Gateway.create(opts); // 初始化网关
return gateway;
};
rpc-server/server
初始化操作:
a: loadRemoteServices 加载了paths中相关的模块。
b: 对于sys 级的类型,加载pomelo/lib/common/remote
下的模块,backend/msgRemote.js、frontend/channelRemote.js、frontend/sessionRemote.js。
c: 对于user 级的类型,加载相应服务下的/remote
里面的自定义模块。
d: 初始化网关
var Gateway = function (opts) {
EventEmitter.call(this);
this.opts = opts || {};
this.port = opts.port || 3050;
this.started = false;
this.stoped = false;
this.acceptorFactory = opts.acceptorFactory || defaultAcceptorFactory;
this.services = opts.services;
var dispatcher = new Dispatcher(this.services); // 初始化调度
if (!!this.opts.reloadRemotes) {
watchServices(this, dispatcher);
}
this.acceptor = this.acceptorFactory.create(opts, function (tracer, msg, cb) { // 初始化接收器
dispatcher.route(tracer, msg, cb); // 调度路由,作为cb对接收器进行赋值cb
});
};
初始化接收器工厂,初始化调度,对于路由来说,通过调度将消息路由到适当的服务对象。
remote中调用start:this.remote.start();
实际上是调用gateway中的start方法。
gateway.js:
pro.start = function () {
this.started = true;
this.acceptor.listen(this.port);
};
gateway调用mqtt-acceptor的listen方法,创建net.Server服务,监听connection字段
mqtt-acceptor.js:
pro.listen = function (port) {
this.inited = true;
var self = this;
this.server = new net.Server();
this.server.listen(port);
this.server.on('connection', function (stream) { // 监听connection字段
var socket = MqttCon(stream);
socket['id'] = curId++;
socket.on('publish', function (pkg) { // 监听publish
pkg = pkg.payload.toString();
var isArray = false;
pkg = JSON.parse(pkg);
if (pkg instanceof Array) {
processMsgs(socket, self, pkg);
isArray = true;
} else {
processMsg(socket, self, pkg); // 发送消息
}
});
self.sockets[socket.id] = socket;
});
};
mqtt-server监听publis字段,发送消息。
var processMsg = function (socket, acceptor, pkg) {
var tracer = null;
acceptor.cb(tracer, pkg.msg, function () { // 调用dispatcher的route方法
var len = arguments.length;
var args = new Array(len);
for (var i = 0; i < len; i++) {
args[i] = arguments[i];
}
var resp = {id: pkg.id, resp: args};
doSend(socket, resp);
});
};
acceptor.cb为dispatcher中的route方法,在初始化acceptor(mqtt-acceptor.js)的时候赋值。
dispatcher.js:
pro.route = function (tracer, msg, cb) {
var namespace = this.services[msg.namespace];
var service = namespace[msg.service];
var method = service[msg.method];
var args = msg.args;
args.push(cb);
method.apply(service, args);
};
通过route调用相应的remote模块中的method方法
mqtt-server.js:
var doSend = function (socket, msg) {
socket.publish({
topic: 'rpc',
payload: JSON.stringify(msg)
});
}
调用socket.publish发布topic为rpc的的消息。
proxy.js:
var Component = function (app, opts) {
this.app = app;
this.opts = opts;
this.client = genRpcClient(this.app, opts);
this.app.event.on(events.ADD_SERVERS, this.addServers.bind(this));
};
genRpcClient初始化rpc-client为this.client
进行赋值操作。
在proxy初始化的过程中创建监听this.app.event.on(events.ADD_SERVERS, this.addServers.bind(this));
rpc-client/clinet:
var Client = function (opts) {
opts = opts || {};
this._context = opts.context;
this._routeContext = opts.routeContext;
this.router = opts.router || router.df;
this.routerType = opts.routerType;
this.rpcDebugLog = opts.rpcDebugLog;
if (this._context) {
opts.clientId = this._context.serverId;
}
this.opts = opts;
this.proxies = {};
this._station = createStation(opts); //初始化MailStation服务
this.state = STATE_INITED;
};
初始化pomelo-rpc/client。
moduleUtil.startModules(self.modules, function (err) {
utils.invokeCallback(cb, err);
return;
});
在monitor服务启动完成后调用moduleUtil.startModules
,其中有效操作为monitorwatcher中的start方法,调用subscribeRequest,从而调用app.addServers
,添加服务器。
Application.addServers = function (servers) {
var item, slist;
for (var i = 0, l = servers.length; i < l; i++) {
item = servers[i];
this.servers[item.id] = item;
slist = this.serverTypeMaps[item.serverType];
if (!slist) {
this.serverTypeMaps[item.serverType] = slist = [];
}
replaceServer(slist, item);
if (this.serverTypes.indexOf(item.serverType) < 0) {
this.serverTypes.push(item.serverType);
}
}
this.event.emit(events.ADD_SERVERS, servers);
};
Application.addServers
方法添加服务器,进而触发events.ADD_SERVERS
,proxy中的this.addServers.bind(this);
proxy.js:
pro.addServers = function (servers) {
//获取remote的路径,根据路径加载并初始化相应的模块
//根据path初始化加载模块,创建proxy,并保存到this.proxies中
genProxies(this.client, this.app, servers);
this.client.addServers(servers);
};
受到app.addServers的触发,调用proxy添加服务操作。
var genProxies = function (client, app, sinfos) {
var item;
for (var i = 0, l = sinfos.length; i < l; i++) {
item = sinfos[i];
if (hasProxy(client, item)) {
continue;
}
//getProxyRecords获取remote的路径
//client.addProxies根据路径加载并初始化相应的模块
//根据path初始化加载模块,创建proxy,并保存到this.proxies中
client.addProxies(getProxyRecords(app, item));
}
};
getProxyRecords:
[
{
namespace: 'sys',
serverType: 'chat',
path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/backend'
},
{
namespace: 'user',
serverType: 'chat',
path: '/chatofpomelo-websocket/game-server/app/servers/chat/remote'
}
]
[
{
namespace: 'sys',
serverType: 'connector',
path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/frontend'
}
]
getProxyRecords获取proxy的路径,其中包含sys系统级的服务,也包含user用户级的服务,调用rpc-client中的addProxies,添加并保存服务。
pomelo-rpc/client.js:
pro.addProxies = function (records) {
for (var i = 0, l = records.length; i < l; i++) {
this.addProxy(records[i]);
}
};
pro.addProxy = function (record) {
// generateProxy,根据path初始化加载模块,并创建proxy
// 根据route分配serverId,然后调用client.rpcInvoke
var proxy = generateProxy(this, record, this._context);
// 把代理添加到this.proxies中
insertProxy(this.proxies, record.namespace, record.serverType, proxy);
};
生成proxy,并将proxy保存到this.proxies中。
var generateProxy = function (client, record, context) {
var res, name;
var modules = Loader.load(record.path, context);
if (modules) {
res = {};
for (name in modules) {
res[name] = Proxy.create({
service: name,
origin: modules[name],
attach: record,
// proxyCB根据route分配serverId,然后调用client.rpcInvoke
proxyCB: proxyCB.bind(null, client)
});
}
}
return res;
};
generateProxy根据path信息初始化加载模块,并创建proxy。
var proxyCB = function (client, serviceName, methodName, args, attach, isToSpecifiedServer) {
var routeParam = args.shift();
var cb = args.pop();
var serverType = attach.serverType;
var msg = {
namespace: attach.namespace,
serverType: serverType,
service: serviceName,
method: methodName,
args: args
};
if (isToSpecifiedServer) {
rpcToSpecifiedServer(client, msg, serverType, routeParam, cb);
} else {
// app.js中自定义的route,根据route规则返回serverId
getRouteTarget(client, serverType, msg, routeParam, function (err, serverId) {
// 调用this._station.dispatch方法 ——> mailbox.send ——> socket.publish({topic: 'rpc',payload: msg});
client.rpcInvoke(serverId, msg, cb);
});
}
};
对于proxyCB方法,其中包含了getRouteTarget,此方法是用来获取目标路由的,其规则为app.js中自定义的route规则,或者为默认规则,并将分配的服务器serverId返回来。
最后根据serverId调用client.rpcInvoke发送数据。
以上操作为genProxies(this.client, this.app, servers);
的作用,创建proxy,并保存到this.proxies中。
pomelo-rpc/client.js:
pro.addServer = function (server) {
this._station.addServer(server);
};
mailstation.js:
pro.addServer = function (serverInfo) {
var id = serverInfo.id;
var type = serverInfo.serverType;
this.servers[id] = serverInfo;
this.onlines[id] = 1;
if (!this.serversMap[type]) {
this.serversMap[type] = [];
}
if (this.serversMap[type].indexOf(id) < 0) {
this.serversMap[type].push(id);
}
this.emit('addServer', id);
};
this.client.addServers(servers);
在mailstation服务中根据服务器信息保存服务器数据到mailstation.serversMap中。
pro.afterStart = function (cb) {
var self = this;
this.app.__defineGetter__('rpc', function () { // 绑定rpc属性
return self.client.proxies.user;
});
this.app.__defineGetter__('sysrpc', function () { // 绑定sysrpc属性
return self.client.proxies.sys;
});
this.app.set('rpcInvoke', this.client.rpcInvoke.bind(this.client), true); // 设置rpcInvoke属性
this.client.start(cb);
};
在app上绑定rpc、sysrpc、rpcInvoke属性,以及对应的调用。
根据此可以看出来,rpc分为三种模式,分别为rpc对应user,sysrpc对应sys,以及rpcInvoke。
根据上面第3点的内容,我们可以看出,rpc的最终指向都为this.client.rpcInvoke方法。
pomelo-rpc/client.js:
pro.rpcInvoke = function (serverId, msg, cb) {
var tracer = null;
this._station.dispatch(tracer, serverId, msg, this.opts, cb);
};
调用mailstation中的dispatch方法
mailstation.js
pro.dispatch = function (tracer, serverId, msg, opts, cb) {
var self = this;
var mailbox = this.mailboxes[serverId];
if (!mailbox) {
if (!lazyConnect(tracer, this, serverId, this.mailboxFactory, cb)) {
self.emit('error', constants.RPC_ERROR.NO_TRAGET_SERVER, tracer, serverId, msg, opts);
}
addToPending(tracer, this, serverId, arguments);
return;
}
var send = function (tracer, err, serverId, msg, opts) {
var mailbox = self.mailboxes[serverId];
mailbox.send(tracer, msg, opts, function (tracer_send, send_err, args) {
doFilter(tracer_send, null, serverId, msg, opts, self.afters, 0, 'after', function (tracer, err, serverId, msg, opts) {
utils.applyCallback(cb, args);
});
});
};
doFilter(tracer, null, serverId, msg, opts, this.befores, 0, 'before', send);
};
如果在this.mailboxes中没有此服务器信息,则通过懒加载的进行创建连接并保存到this.mailboxes中,然后通过过滤器过滤,调用mailbox中的send方法
var lazyConnect = function (tracer, station, serverId, factory, cb) {
var server = station.servers[serverId];
var online = station.onlines[serverId];
var mailbox = factory.create(server, station.opts);
station.connecting[serverId] = true;
station.mailboxes[serverId] = mailbox;
station.connect(tracer, serverId, cb);
return true;
};
为rpc服务创建connect连接,用以连接remote服务。
mqtt-mailbox.js:
MailBox.prototype.connect = function (tracer, cb) {
var self = this;
var stream = net.createConnection(this.port, this.host);
this.socket = MqttCon(stream);
this.socket.connect({
clientId: 'MQTT_RPC_' + Date.now()
}, function () {
self.connected = true;
if (self.bufferMsg) {
self._interval = setInterval(function () {
flush(self);
}, self.interval);
}
self.setupKeepAlive();
});
this.socket.on('publish', function (pkg) {
if (pkg.topic == Constants['TOPIC_HANDSHAKE']) {
upgradeHandshake(self, pkg.payload);
return cb();
}
pkg = Coder.decodeClient(pkg.payload);
processMsg(self, pkg);
});
};
通过net.createConnection(this.port, this.host);
与remote服务创建连接,监听publish,为publish提供processMsg方法。
MailBox.prototype.send = function (tracer, msg, opts, cb) {
var id = this.curId++;
this.requests[id] = cb;
var pkg = Coder.encodeClient(id, msg, this.servicesMap);
if (this.bufferMsg) {
enqueue(this, pkg);
} else {
doSend(this.socket, pkg);
}
};
var doSend = function (socket, msg) {
socket.publish({
topic: 'rpc',
payload: msg
});
}
通过doSend中的socket.publish发布信息,在pomelo-rpc/server端进行接收,即mqtt-acceptor.js。
对于rpc服务来说,proxy为rpc-client端,remote为rpc-server端,并且提供sys系统级和user用户级的服务。
pomelo为用户提供了两种调用rpc的方式:
一种是:
self.app.rpc.chat.chatRemote.add(session, uid, self.app.get('serverId'), rid, true, function(users){
next(null, { users: users });
});
另一种是:
self.app.rpcInvoke('chat-server-1',{
namespace : "user",
service : "chatRemote",
method : "add",
args : [uid, self.app.get('serverId'), rid, true]
}, function(users){
next(null, { users:users })
});
相对于第一种,第二种属于直接调用client.rpcInvoke
,由前文得知,第一种的底层逻辑也是调用client.rpcInvoke
。
self.app.rpcInvoke(serverId,msg,cb);
在调用proxy的afterStart方法时绑定rpcInvoke服务:this.app.set('rpcInvoke', this.client.rpcInvoke.bind(this.client), true);
实则调用mailstation服务中的dispatch方法:this._station.dispatch(tracer, serverId, msg, this.opts, cb);
mailbox.send(tracer, msg, opts, function (tracer_send, send_err, args) {
utils.applyCallback(cb, args);
});
调用最底层mailbox服务,实则调用mqtt-mailbox的send方法。
MailBox.prototype.send = function (tracer, msg, opts, cb) {
var id = this.curId++;
this.requests[id] = cb;
var pkg = {id: id,msg: msg};
doSend(this.socket, pkg);
};
var doSend = function (socket, msg) {
socket.publish({
topic: 'rpc',
payload: JSON.stringify(msg)
});
}
通过socket发布消息到指定服务器。
socket.on('publish', function (pkg) {
pkg = pkg.payload.toString();
pkg = JSON.parse(pkg);
processMsg(socket, self, pkg);
});
var processMsg = function (socket, acceptor, pkg) {
var tracer = null;
acceptor.cb(tracer, pkg.msg, function () {
var len = arguments.length;
var args = new Array(len);
for (var i = 0; i < len; i++) {
args[i] = arguments[i];
}
var resp = {id: pkg.id, resp: args};
doSend(socket, resp);
});
};
通过触发publish监听,解析rpc数据,调用processMsg方法,从而调用 acceptor.cb方法。
var Acceptor = function (opts, cb) {
this.cb = cb;
};
我们再看一下Acceptor.cb方法,我们发现此方法是在Acceptor初始化的时候赋值的。
var Gateway = function (opts) {ceptor = this.acceptorFactory.create(opts, function (tracer, msg, cb) {
dispatcher.route(tracer, msg, cb);
});
};
在往上查看,cb为dispatcher.route(tracer, msg, cb);
方法。
pro.route = function (tracer, msg, cb) {
var namespace = this.services[msg.namespace];
var service = namespace[msg.service];
var method = service[msg.method];
var args = msg.args;
args.push(cb);
method.apply(service, args);
};
在dispatcher.route
中把相应service服务中的方法绑定到了这里,即chatRomte.js
中的方法在这里进行调用。
ChatRemote.prototype.add = function(uid, sid, name, flag, cb) {
cb();
};
上面的dispatcher.route
中根据method进行方法匹配调用,进而调用相应的模块下的方法。执行完毕后cb放回结果。
以上操作,就完成了rpc调用的目的,在不同服务器间进行调用,现在又回到了mqtt-acceptor服务
var doSend = function (socket, msg) {
socket.publish({
topic: 'rpc',
payload: JSON.stringify(msg)
});
}
调用完成后,再次通过rpc目标服务器向原服务器进行socket.publish
发布消息。
this.socket.on('publish', function (pkg) {
pkg = pkg.payload.toString();
pkg = JSON.parse(pkg);
processMsg(self, pkg);
});
var processMsg = function (mailbox, pkg) {
var pkgId = pkg.id;
var cb = mailbox.requests[pkgId];
delete mailbox.requests[pkgId];
var tracer = null;
var sendErr = null;
var pkgResp = pkg.resp;
cb(tracer, sendErr, pkgResp);
};
通过监听publish,接收rpc返回的数据,解析,并通过cb进行返回。
self.app.rpcInvoke(serverId,msg,function(){
console.log('end');
});
以上所有为rpc完成的操作流程,最后通过回调,执行rpc后续操作。
经过一个完整的rpc调用流程,我们发现:
a: mqtt-mailbox与mqtt-acceptor服务分别为proxy与remote服务的最底层,进行rpc服务的最底层消息传递操作。
b: mailbox和acceptor分别为mqtt-mailbox与mqtt-acceptor服务的上一层,作为工厂方法,创建其底层实现。
c: 作为mailbox和acceptor的上一层mailstation与gateway,对mailbox和acceptor进行管理,并为上层提供统一接口。
d: 最上层为client与server层,对于client层来说,为代理层,隐藏具体rpc细节,为proxy服务提供接口,对于server层来说,为服务层,加载remote服务,提供远程服务业务逻辑实现。