项目地址: https://github.com/NetEase/chatofpomelo-websocket.git
分支:tutorial-protobuf
如果发现有错误的地方,请在评论中及时指出,谢谢
channel、connector、dictionary、master、monitor、proxy、remote、server中包含start
this.server = this.app.components.__server__;
this.session = this.app.components.__session__;
this.connection = this.app.components.__connection__;
把server、session和connection模块绑定到connector上。
读取 handler 里面的接口和dictionary.json,给this.abbrs
和this.dict
赋值
this.master.start(cb);
调用master/master.js的start方法
this.monitor.start(cb);
调用monitor/monitor.js的start方法
pro.start = function(cb) {
this.opts.port = this.app.getCurServer().port;
this.remote = genRemote(this.app, this.opts); // require('pomelo-rpc').server
this.remote.start();
};
对this.remote
进行require('pomelo-rpc').server
初始化赋值,然后调用this.remote.start
方法,即require('pomelo-rpc').server
的start方法。
this.server.start();
调用server/server.js的start方法
var Server = function (app, opts) {
this.app = app;
this.masterInfo = app.getMaster(); // 获取master信息
this.registered = {};
this.modules = [];
opts = opts || {};
opts.port = this.masterInfo.port;
opts.env = this.app.get(Constants.RESERVED.ENV);
this.closeWatcher = opts.closeWatcher;
this.masterConsole = admin.createMasterConsole(opts); // require('pomelo-admin');
};
对this.masterConsole
进行赋值,调用require('pomelo-admin');
的createMasterConsole方法,即为pomelo-admin/lib/consoleService.js
,此处的masterConsole为pomelo-admin的上层入口,也是上层代理,通过masterConsole调用pomelo-admin中的方法。
Server.prototype.start = function (cb) {
moduleUtil.registerDefaultModules(true, this.app, this.closeWatcher);
moduleUtil.loadModules(this, this.masterConsole);
var self = this;
// start master console
this.masterConsole.start(function (err) {
if (err) {
process.exit(0);
}
moduleUtil.startModules(self.modules, function (err) {
if (err) {
utils.invokeCallback(cb, err);
return;
}
if (self.app.get(Constants.RESERVED.MODE) !== Constants.RESERVED.STAND_ALONE) {
starter.runServers(self.app);
}
utils.invokeCallback(cb);
});
});
this.masterConsole.on('error', function (err) {});
this.masterConsole.on('reconnect', function (info) {
self.app.addServers([info]);
});
// monitor servers disconnect event
this.masterConsole.on('disconnect', function (id, type, info, reason) {});
// monitor servers register event
this.masterConsole.on('register', function (record) {
starter.bindCpu(record.id, record.pid, record.host);
});
this.masterConsole.on('admin-log', function (log, error) {});
};
registerDefaultModules 方法把 modules/masterwatcher(master观察者)、admin.modules.watchServer(admin观察服务) 、modules/console 添加到app.modules 中。
如果开启了systemMonitor,则注册相应的模块。
pro.registerDefaultModules = function (isMaster, app, closeWatcher) {
if (!closeWatcher) {
if (isMaster) {
app.registerAdmin(require('../modules/masterwatcher'), {app: app});
} else {
app.registerAdmin(require('../modules/monitorwatcher'), {app: app});
}
}
app.registerAdmin(admin.modules.watchServer, {app: app});
app.registerAdmin(require('../modules/console'), {app: app, starter: starter});
if (app.enabled('systemMonitor')) {
if (os.platform() !== Constants.PLATFORM.WIN) {
app.registerAdmin(admin.modules.systemInfo);
app.registerAdmin(admin.modules.nodeInfo);
}
app.registerAdmin(admin.modules.monitorLog, {path: pathUtil.getLogPath(app.getBase())});
app.registerAdmin(admin.modules.scripts, {app: app, path: pathUtil.getScriptPath(app.getBase())});
if (os.platform() !== Constants.PLATFORM.WIN) {
app.registerAdmin(admin.modules.profiler);
}
}
};
loadModules 方法根据app.get(Constants.KEYWORDS.MODULE);
获取相应模块,此处为masterwatcher、watchServer,console模块,如果record.module
的type类型为function则初始化此方法,把相应组件注册到consoleService中,并保存到self.modules
中。
pro.loadModules = function (self, consoleService) {
// load app register modules
var _modules = self.app.get(Constants.KEYWORDS.MODULE);
if (!_modules) {
return;
}
var modules = [];
for (var m in _modules) {
modules.push(_modules[m]);
}
/**
modules
[
{
moduleId: '__masterwatcher__',
module: [Function] { moduleId: '__masterwatcher__' },
opts: { app: [Object] }
},
{
moduleId: 'watchServer',
module: [Function] { moduleId: 'watchServer' },
opts: { app: [Object] }
},
{
moduleId: '__console__',
module: [Function] { moduleId: '__console__' },
opts: { app: [Object], starter: [Object] }
}
]
*/
var record, moduleId, module;
for (var i = 0, l = modules.length; i < l; i++) {
record = modules[i];
if (typeof record.module === 'function') {
module = record.module(record.opts, consoleService);
} else {
module = record.module;
}
moduleId = record.moduleId || module.moduleId;
if (!moduleId) {
logger.warn('ignore an unknown module.');
continue;
}
consoleService.register(moduleId, module);
self.modules.push(module);
}
};
consoleService.register(moduleId, module);
则把相应模块保存到consoleService的this.modules中。
ConsoleService.prototype.register = function(moduleId, module) {
this.modules[moduleId] = registerRecord(this, moduleId, module);
};
以上加载操作完成后,则调用this.materConsole.start
方法。
等this.materConsole.start
执行完毕后,调用moduleUtil.startModules(self.modules)
,此为调用this.modules
中所有对象的start方法(master启动时,start没有有效操作),最后调用starter.runServers(self.app);
启动所有服务器。
在master/master.js
中对this.master进行赋值操作,this.masterConsole = admin.createMasterConsole(opts);
同时初始化consoleService,在初始化consoleService的同时,对consoleService 的 this.agent = new MasterAgent(this, opts);
进行初始化操作。
即:
master(this.masterConsole)——>consoleService(this.agent)——>masterAgent
所以,在调用master的start时,实则是在调用consoleService的start方法。
ConsoleService.prototype.start = function (cb) {
if (this.master) {
var self = this;
this.agent.listen(this.port, function (err) {
if (!!err) {
utils.invokeCallback(cb, err);
return;
}
exportEvent(self, self.agent, 'register');
exportEvent(self, self.agent, 'disconnect');
exportEvent(self, self.agent, 'reconnect');
process.nextTick(function () {
utils.invokeCallback(cb);
});
});
}
exportEvent(this, this.agent, 'error');
for (var mid in this.modules) {
this.enable(mid);
}
};
而consoleService.start又在调用this.agent.listen的方法,即为masterAgent.js的listen方法。
MasterAgent.prototype.listen = function (port, cb) {
this.state = ST_STARTED;
this.server = new MqttServer(); // 初始化mqttServer对象
this.server.listen(port); // 调用mqttServer.listen方法
var self = this;
this.server.once('listening', function () {
setImmediate(function () {
cb();
});
});
this.server.on('connection', function (socket) { // this.server监听connection
// var id, type, info, registered, username;
var masterSocket = new MasterSocket();
masterSocket['agent'] = self; // 对masterSocket的agent赋值,值为masterAgent本身
masterSocket['socket'] = socket; // 对masterSocket的socket赋值,值为socket
self.sockets[socket.id] = socket; // 根据socket.id保存socket到sockets对象中
socket.on('register', function (msg) { // 监听register,在masterSocket中进行注册
// register a new connection
masterSocket.onRegister(msg);
});
// message from monitor
socket.on('monitor', function (msg) { // 监听monitor,在masterSocket中进行监控
masterSocket.onMonitor(msg);
});
// message from client
socket.on('client', function (msg) {
masterSocket.onClient(msg);
});
});
};
首先初始化this.server = new MqttServer();
然后又调用了masterAgent的listen方法:this.server.listen(port);
从这可以看出,所以这个链路又成了:
master(this.masterConsole)——>consoleService(this.agent)——>masterAgent——>mqttServer
对this.server设置了监听this.server.on('connection', function (socket) {})
当触发监听时,把初始化masterSocket,为socket监听提供masterSocket.onRegister(msg);
、masterSocket.onMonitor(msg);
方法
MqttServer.prototype.listen = function (port) {
this.inited = true;
var self = this;
this.server = new net.Server(); // 创建server连接
this.server.listen(port);
this.server.on('listening', this.emit.bind(this, 'listening'));
this.server.on('connection', function (stream) { // 当this.server被连接时触发
var socket = MqttCon(stream);
socket['id'] = curId++;
socket.on('connect', function (pkg) { // socket对connect进行监听
socket.connack({ // ack握手
returnCode: 0
});
});
socket.on('publish', function (pkg) { // socket对pulish进行监听
var topic = pkg.topic;
var msg = pkg.payload.toString();
msg = JSON.parse(msg);
socket.emit(topic, msg); // 根据topic进行相应的触发提交
});
socket.on('pingreq', function () {
socket.pingresp();
});
socket.send = function (topic, msg) { // 发送消息
socket.publish({ // 调用socket.publish,发布消息,--> mqttClient on('publish')
topic: topic,
payload: JSON.stringify(msg)
});
};
self.emit('connection', socket); // 触发masterAgent的connection监听
});
};
由此可以看出,此处为master服务的最底层,初始化this.server,监听port端口,监听connection字段,并为socket提供一系列的监听:connect、publish、pingreq,以及send方法,分别提供连接握手,发布消息,发送消息,已经触发上层的connection监听。
var Monitor = function (app, opts) {
opts = opts || {};
this.app = app;
this.serverInfo = app.getCurServer();
this.masterInfo = app.getMaster();
this.modules = [];
this.closeWatcher = opts.closeWatcher;
this.monitorConsole = admin.createMonitorConsole({
id: this.serverInfo.id,
type: this.app.getServerType(),
host: this.masterInfo.host,
port: this.masterInfo.port,
info: this.serverInfo,
env: this.app.get(Constants.RESERVED.ENV),
authServer: app.get('adminAuthServerMonitor') // auth server function
});
};
对this.monitorConsole进行赋值,调用require(‘pomelo-admin’);的createMonitorConsole方法,即为pomelo-admin/lib/consoleService.js,此处的monitorConsole为pomelo-admin的上层入口,也是上层代理,通过monitorConsole调用pomelo-admin中的方法。
Monitor.prototype.start = function (cb) {
moduleUtil.registerDefaultModules(false, this.app, this.closeWatcher);
this.startConsole(cb);
};
Monitor.prototype.startConsole = function (cb) {
moduleUtil.loadModules(this, this.monitorConsole);
var self = this;
this.monitorConsole.start(function (err) {
if (err) {
utils.invokeCallback(cb, err);
return;
}
moduleUtil.startModules(self.modules, function (err) {
utils.invokeCallback(cb, err);
return;
});
});
this.monitorConsole.on('error', function (err) {
if (!!err) {
logger.error('monitorConsole encounters with error: %j', err.stack);
return;
}
});
};
调用moduleUtil.registerDefaultModules
,因为为monitor进程,所以此时要注册require('../modules/monitorwatcher')
到this.set(Constants.KEYWORDS.MODULE)
中
app.registerAdmin(require('../modules/monitorwatcher'), {app: app});
其余的如masterwatcher、admin.modules.watchServer等,在master进程启动时已经加载过了。
调用moduleUtil.loadModules(this, this.monitorConsole);
把相应的moduleId和module注册到consoleService中,并初始化,然后把module保存到monitor的modules中,注意:monitor要比master多一个monitorwatcher。
以上加载操作完成后,则调用this.monitorConsole.start
方法。
在master时,这个调用没有什么有效执行,但是在monitor中比master多了一个monitorwatcher,在这个对象里面有一个start方法,后续这里有有效执行。
在monitor/monitor.js
中对this.monitor进行赋值操作,this.monitorConsole = admin.createMonitorConsole(opts);
同时初始化consoleService,在初始化consoleService的同时,对consoleService 的 this.agent = new MonitorAgent(this, opts);
进行初始化操作。
即:
monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent
所以,在调用monitor的start时,实则是在调用consoleService的start方法。
ConsoleService.prototype.start = function (cb) {
this.agent.connect(this.port, this.host, cb);
exportEvent(this, this.agent, 'close');
exportEvent(this, this.agent, 'error');
for (var mid in this.modules) {
this.enable(mid);
}
};
而consoleService.start又在调用this.agent.connect的方法,即为monitorAgent.js的connect方法。
MonitorAgent.prototype.connect = function (port, host, cb) {
this.socket = new MqttClient(this.opts); // 初始化mqttClient对象
this.socket.connect(host, port); // 调用mqttClient的connect方法
var self = this;
this.socket.on('register', function (msg) { // this.socket监听register,改变状态
if (msg && msg.code === protocol.PRO_OK) {
self.state = ST_REGISTERED;
cb();
}
});
this.socket.on('monitor', function (msg) { // this.socket监听monitor
msg = protocol.parse(msg);
if (msg.command) { // 调用相应command命令
// a command from master
self.consoleService.command(msg.command, msg.moduleId, msg.body, function (err, res) {
//notify should not have a callback
});
} else {
var respId = msg.respId;
if (respId) { // 调用cb返回数据
// a response from monitor
var respCb = self.callbacks[respId];
delete self.callbacks[respId];
respCb(msg.error, msg.body);
return;
}
// request from master
self.consoleService.execute(msg.moduleId, 'monitorHandler', msg.body, function (err, res) {
if (protocol.isRequest(msg)) {
var resp = protocol.composeResponse(msg, err, res);
if (resp) {
self.doSend('monitor', resp);
}
}
});
}
});
this.socket.on('connect', function () { // this.socket监听connect
self.state = ST_CONNECTED;
var req = {
id: self.id,
type: 'monitor',
serverType: self.type,
pid: process.pid,
info: self.info
};
var authServer = self.consoleService.authServer;
var env = self.consoleService.env;
authServer(req, env, function (token) {
req['token'] = token;
self.doSend('register', req); // 发送注册字段
});
});
this.socket.on('reconnect_ok', function (msg) {
if (msg && msg.code === protocol.PRO_OK) {
self.state = ST_REGISTERED;
}
});
};
首先初始化this.socket = new MqttClient(this.opts);
然后又调用了MqttClient的connect方法:this.socket.connect(host, port);
从这可以看出,所以这个链路又成了:
monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent——>mqttClient
this.socket对register、monitor、connect进行监听,当触发时,进行相应操作。
MqttClient.prototype.connect = function (host, port, cb) {
var self = this;
this.closed = false;
var stream = net.createConnection(this.port, this.host); // 创建简介
this.socket = MqttCon(stream);
this.socket.connect({ // 调用socket.connect
clientId: this.clientId
});
this.socket.on('connack', function () { // 监听ack握手
self.connected = true;
self.setupKeepAlive();
if (self.connectedTimes++ == 1) {
self.emit('connect'); // 触发connect
cb();
} else {
self.emit('reconnect');
}
});
this.socket.on('publish', function (pkg) { // 监听publish字段
var topic = pkg.topic;
var msg = pkg.payload.toString();
msg = JSON.parse(msg);
self.emit(topic, msg); // 触发相应的topic监听 --> monitorAgent on('topic')
});
this.socket.on('pingresp', function () {
self.lastPong = Date.now();
});
}
由此可以看出,此处为monitor服务的最底层,初始化this.socket,调用this.socket.connect方法
监听connack字段,触发connect的监听,监听publish字段,触发相应的topic监听
master的层级为:
master——>/master/master(this.masterConsole)——>consoleService(this.agent)——>masterAgent——>mqttServer
monitor的层级为:
monitor——>/monitor/monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent——>mqttClient
根据层级,我们可以看出,master和monitor都是创建了一个代理,提供所需功能,masterAgent和monitorAgent是一个层级,mqttServer和mqttClient是一个层级,他们之间的通信都是基于mqttServer和mqttClient所实现的。
a: 在master启动时,mqttServer创建了net.server服务,进行监听。
b: 在monitor启动时,mqttClient创建连接触发mqttServer中的self.emit('connection', socket);
进而触发masterAgent中的connection监听,为socket创建更多的监听。
c: 进行ACK握手,self.emit('connect');
触发monitorAgent的connect监听。
d: monitorAgent的connect监听,执行self.doSend('register', req);
MonitorAgent.prototype.doSend = function (topic, msg) {
this.socket.send(topic, msg);
}
MqttClient.prototype.send = function (topic, msg) {
this.socket.publish({ // --> mqttServer on('publish')
topic: topic,
payload: JSON.stringify(msg)
})
}
MqttServer.js
socket.on('publish', function (pkg) {
var topic = pkg.topic;
var msg = pkg.payload.toString();
msg = JSON.parse(msg);
socket.emit(topic, msg); // --> masterAgent on('topic')
});
e: monitorAgent的doSend方法,实则是在调用mqttClient中的send方法,this.socket.publish()
从而触发mqttServer中的socket.on('publish')
监听,最终触发了masterAgent响应的topic监听。
monitorAgent.doSend——>masterAgent 的topic监听。
MasterSocket.prototype.onRegister = function (msg) {
var self = this;
var serverId = msg.id;
var serverType = msg.type;
var socket = this.socket;
// 根据类型判断
if (serverType == Constants.TYPE_CLIENT) {
this.id = serverId;
this.type = serverType;
this.info = 'client';
this.agent.doAuthUser(msg, socket, function (err) {
self.username = msg.username;
self.registered = true;
});
return;
} // end of if(serverType === 'client')
if (serverType == Constants.TYPE_MONITOR) {
this.id = serverId;
this.type = msg.serverType;
this.info = msg.info;
this.agent.doAuthServer(msg, socket, function (err) { //验证服务器权限
self.registered = true;
});
this.repushQosMessage(serverId);
return;
} // end of if(serverType === 'monitor')
this.agent.doSend(socket, 'register', {
code: protocol.PRO_FAIL,
msg: 'unknown auth master type'
});
socket.disconnect();
}
MasterAgent.prototype.doAuthServer = function (msg, socket, cb) {
var self = this;
var authServer = self.consoleService.authServer;
var env = self.consoleService.env;
authServer(msg, env, function (status) {
var record = addConnection(self, msg.id, msg.serverType, msg.pid, msg.info, socket); // 保存连接信息(pid, type,socket等)
doSend(socket, 'register', { // 调用mqttServer中的socket.send
code: protocol.PRO_OK,
msg: 'ok'
});
msg.info = msg.info || {}
msg.info.pid = msg.pid;
self.emit('register', msg.info); // --> master on('register')
cb(null);
});
};
f: 这次register的操作,对于monitorAgent改变了的state状态,对于masterAgent在masterSocket中进行了赋值操作。
当monitor启动完成后,调用了moduleUtil.startModules方法,实际有效调用为monitorwatcher中的start方法。
var subscribeRequest = function (self, agent, id, cb) {
var msg = {action: 'subscribe', id: id};
// 此方法最终调到MASTER_WATCHER模块的subscribe方法
agent.request(Constants.KEYWORDS.MASTER_WATCHER, msg, function (err, servers) { // --> monitorAgent.request
var res = [];
for (var id in servers) {
res.push(servers[id]);
}
addServers(self, res);
utils.invokeCallback(cb);
});
};
MonitorAgent.prototype.request = function (moduleId, msg, cb) {
var reqId = this.reqId++;
this.callbacks[reqId] = cb;
this.doSend('monitor', protocol.composeRequest(reqId, moduleId, msg)); // --> masterAgent on('monitor')
};
MasterSocket.prototype.onMonitor = function (msg) {
var socket = this.socket;
var self = this;
var type = this.type;
msg = protocol.parse(msg);
var respId = msg.respId;
if (respId) {
// a response from monitor
var cb = self.agent.callbacks[respId];
var id = this.id;
if (self.agent.msgMap[id]) {
delete self.agent.msgMap[id][respId];
}
delete self.agent.callbacks[respId];
return cb(msg.error, msg.body);
}
// a request or a notify from monitor
self.agent.consoleService.execute(msg.moduleId, 'masterHandler', msg.body, function (err, res) { // 调用相应moduleId模块的masterHandler方法
if (protocol.isRequest(msg)) {
var resp = protocol.composeResponse(msg, err, res);
if (resp) {
self.agent.doSend(socket, 'monitor', resp);
}
}
});
}
Module.prototype.masterHandler = function (agent, msg, cb) {
var func = masterMethods[msg.action];
func(this, agent, msg, cb);
};
var subscribe = function (module, agent, msg, cb) {
module.watchdog.subscribe(msg.id);
utils.invokeCallback(cb, null, module.watchdog.query());
};
monitorwatcher中的start方法经过这一系列的操作,最终在watchdog中进行订阅。
在start启动完成之后,后续调用afterStart方法
Application.afterStart = function (cb) {
var afterFun = this.lifecycleCbs[Constants.LIFECYCLE.AFTER_STARTUP];
var self = this;
appUtil.optComponents(this.loaded, Constants.RESERVED.AFTER_START, function (err) {
self.state = STATE_STARTED;
var id = self.getServerId();
if (!!afterFun) {
afterFun.call(null, self, function () {
utils.invokeCallback(cb, err);
});
} else {
utils.invokeCallback(cb, err);
}
var usedTime = Date.now() - self.startTime;
self.event.emit(events.START_SERVER, id); // 触发monitorwatcher中的监听
});
};
此时的appUtil.optComponents(this.loaded, Constants.RESERVED.AFTER_START)
没有有效调用,因为此时的master和monitor中没有afterStart方法。
对于self.event.emit(events.START_SERVER, id);
a: 先调用了monitorwatcher中的监听
b: 调用monitorAgent中的notify
c: 调用mqttClient中的send
d: 触发mqttServer中的publish监听
e: 触发masterAgent中的monitor监听
f: 调用masterwatcher中的record方法
g: 调用watchdog中的record方法
根据服务器数量,判断所有服务器是否全部启动完成,进而调用startOver。