pomelo 框架第二部分 master/monitor服务

松越
2023-12-01

pomelo 框架第二部分

项目地址: https://github.com/NetEase/chatofpomelo-websocket.git
分支:tutorial-protobuf
如果发现有错误的地方,请在评论中及时指出,谢谢

一、pomelo调用各模块start接口

channel、connector、dictionary、master、monitor、proxy、remote、server中包含start

1.onnector.js

this.server = this.app.components.__server__;
this.session = this.app.components.__session__;
this.connection = this.app.components.__connection__;

把server、session和connection模块绑定到connector上。

2.dictionary.js

读取 handler 里面的接口dictionary.json,给this.abbrsthis.dict赋值

3.master.js

this.master.start(cb);

调用master/master.js的start方法

4.monitor.js

this.monitor.start(cb);

调用monitor/monitor.js的start方法

5.remote.js

pro.start = function(cb) {
  this.opts.port = this.app.getCurServer().port;
  this.remote = genRemote(this.app, this.opts); // require('pomelo-rpc').server
  this.remote.start();
};

this.remote进行require('pomelo-rpc').server初始化赋值,然后调用this.remote.start方法,即require('pomelo-rpc').server的start方法。

6.server.js

this.server.start();

调用server/server.js的start方法

二、master/master.js的start执行步骤

1.在初始化master.js时对master/master.js进行初始化操作。

var Server = function (app, opts) {
    this.app = app;
    this.masterInfo = app.getMaster(); // 获取master信息
    this.registered = {};
    this.modules = [];
    opts = opts || {};

    opts.port = this.masterInfo.port;
    opts.env = this.app.get(Constants.RESERVED.ENV);
    this.closeWatcher = opts.closeWatcher;
    this.masterConsole = admin.createMasterConsole(opts); // require('pomelo-admin');
};

this.masterConsole进行赋值,调用require('pomelo-admin');的createMasterConsole方法,即为pomelo-admin/lib/consoleService.js,此处的masterConsole为pomelo-admin的上层入口,也是上层代理,通过masterConsole调用pomelo-admin中的方法。

2.master/master.js中的start方法。

Server.prototype.start = function (cb) {
    moduleUtil.registerDefaultModules(true, this.app, this.closeWatcher);
    moduleUtil.loadModules(this, this.masterConsole);

    var self = this;
    // start master console
    this.masterConsole.start(function (err) {
        if (err) {
            process.exit(0);
        }
        moduleUtil.startModules(self.modules, function (err) {
            if (err) {
                utils.invokeCallback(cb, err);
                return;
            }

            if (self.app.get(Constants.RESERVED.MODE) !== Constants.RESERVED.STAND_ALONE) {
                starter.runServers(self.app);
            }
            utils.invokeCallback(cb);
        });
    });

    this.masterConsole.on('error', function (err) {});

    this.masterConsole.on('reconnect', function (info) {
        self.app.addServers([info]);
    });
    
    // monitor servers disconnect event
    this.masterConsole.on('disconnect', function (id, type, info, reason) {});
    
    // monitor servers register event
    this.masterConsole.on('register', function (record) {
        starter.bindCpu(record.id, record.pid, record.host);
    });

    this.masterConsole.on('admin-log', function (log, error) {});
};

3.moduleUtil.registerDefaultModules方法

registerDefaultModules 方法把 modules/masterwatcher(master观察者)admin.modules.watchServer(admin观察服务)modules/console 添加到app.modules 中。
如果开启了systemMonitor,则注册相应的模块。

pro.registerDefaultModules = function (isMaster, app, closeWatcher) {
    if (!closeWatcher) {
        if (isMaster) {
            app.registerAdmin(require('../modules/masterwatcher'), {app: app});
        } else {
            app.registerAdmin(require('../modules/monitorwatcher'), {app: app});
        }
    }
    app.registerAdmin(admin.modules.watchServer, {app: app});
    app.registerAdmin(require('../modules/console'), {app: app, starter: starter});
    if (app.enabled('systemMonitor')) {
        if (os.platform() !== Constants.PLATFORM.WIN) {
            app.registerAdmin(admin.modules.systemInfo);
            app.registerAdmin(admin.modules.nodeInfo);
        }
        app.registerAdmin(admin.modules.monitorLog, {path: pathUtil.getLogPath(app.getBase())});
        app.registerAdmin(admin.modules.scripts, {app: app, path: pathUtil.getScriptPath(app.getBase())});
        if (os.platform() !== Constants.PLATFORM.WIN) {
            app.registerAdmin(admin.modules.profiler);
        }
    }
};

4.moduleUtil.loadModules方法

loadModules 方法根据app.get(Constants.KEYWORDS.MODULE);获取相应模块,此处为masterwatcher、watchServer,console模块,如果record.module的type类型为function则初始化此方法,把相应组件注册到consoleService中,并保存到self.modules中。

pro.loadModules = function (self, consoleService) {
    // load app register modules
    var _modules = self.app.get(Constants.KEYWORDS.MODULE);

    if (!_modules) {
        return;
    }

    var modules = [];
    for (var m in _modules) {
        modules.push(_modules[m]);
    }
/**    
modules
[
    {
        moduleId: '__masterwatcher__',
        module: [Function] { moduleId: '__masterwatcher__' },
        opts: { app: [Object] }
    },
    {
        moduleId: 'watchServer',
        module: [Function] { moduleId: 'watchServer' },
        opts: { app: [Object] }
    },
    {
        moduleId: '__console__',
        module: [Function] { moduleId: '__console__' },
        opts: { app: [Object], starter: [Object] }
    }
]    
*/

    var record, moduleId, module;
    for (var i = 0, l = modules.length; i < l; i++) {
        record = modules[i];
        if (typeof record.module === 'function') {
            module = record.module(record.opts, consoleService);
        } else {
            module = record.module;
        }

        moduleId = record.moduleId || module.moduleId;

        if (!moduleId) {
            logger.warn('ignore an unknown module.');
            continue;
        }

        consoleService.register(moduleId, module);
        self.modules.push(module);
    }
};

5.pomelo-admin/consoleService.register方法

consoleService.register(moduleId, module); 则把相应模块保存到consoleService的this.modules中。

ConsoleService.prototype.register = function(moduleId, module) {
	this.modules[moduleId] = registerRecord(this, moduleId, module);
};

6.调用this.materConsole.start方法

以上加载操作完成后,则调用this.materConsole.start方法。

7.启动所有服务器

this.materConsole.start执行完毕后,调用moduleUtil.startModules(self.modules),此为调用this.modules中所有对象的start方法(master启动时,start没有有效操作),最后调用starter.runServers(self.app);启动所有服务器。

三、master——pomelo-admin/consoleService.js

1.初始化操作的顺序

master/master.js中对this.master进行赋值操作,this.masterConsole = admin.createMasterConsole(opts);同时初始化consoleService,在初始化consoleService的同时,对consoleService 的 this.agent = new MasterAgent(this, opts); 进行初始化操作。

即:
master(this.masterConsole)——>consoleService(this.agent)——>masterAgent
所以,在调用master的start时,实则是在调用consoleService的start方法。

ConsoleService.prototype.start = function (cb) {
    if (this.master) {
        var self = this;
        this.agent.listen(this.port, function (err) {
            if (!!err) {
                utils.invokeCallback(cb, err);
                return;
            }
            exportEvent(self, self.agent, 'register');
            exportEvent(self, self.agent, 'disconnect');
            exportEvent(self, self.agent, 'reconnect');
            process.nextTick(function () {
                utils.invokeCallback(cb);
            });
        });
    } 
    exportEvent(this, this.agent, 'error');
    for (var mid in this.modules) {
        this.enable(mid);
    }
};

而consoleService.start又在调用this.agent.listen的方法,即为masterAgent.js的listen方法。

2.masterAgent.listen方法

MasterAgent.prototype.listen = function (port, cb) {

    this.state = ST_STARTED;
    this.server = new MqttServer(); // 初始化mqttServer对象
    this.server.listen(port); // 调用mqttServer.listen方法

    var self = this;

    this.server.once('listening', function () {
        setImmediate(function () {
            cb();
        });
    });

    this.server.on('connection', function (socket) { // this.server监听connection
    
        // var id, type, info, registered, username;
        var masterSocket = new MasterSocket();
        masterSocket['agent'] = self; // 对masterSocket的agent赋值,值为masterAgent本身
        masterSocket['socket'] = socket; // 对masterSocket的socket赋值,值为socket

        self.sockets[socket.id] = socket; // 根据socket.id保存socket到sockets对象中

        socket.on('register', function (msg) { // 监听register,在masterSocket中进行注册
            // register a new connection
            masterSocket.onRegister(msg);
        });

        // message from monitor
        socket.on('monitor', function (msg) { // 监听monitor,在masterSocket中进行监控
            masterSocket.onMonitor(msg);
        });
        
        // message from client
        socket.on('client', function (msg) {
            masterSocket.onClient(msg);
        });
    });
};

首先初始化this.server = new MqttServer();然后又调用了masterAgent的listen方法:this.server.listen(port);
从这可以看出,所以这个链路又成了:
master(this.masterConsole)——>consoleService(this.agent)——>masterAgent——>mqttServer
对this.server设置了监听this.server.on('connection', function (socket) {})当触发监听时,把初始化masterSocket,为socket监听提供masterSocket.onRegister(msg);masterSocket.onMonitor(msg);方法

3.mqttServer.listen方法

MqttServer.prototype.listen = function (port) {
    this.inited = true;
    var self = this;

    this.server = new net.Server(); // 创建server连接
    this.server.listen(port);

    this.server.on('listening', this.emit.bind(this, 'listening'));

    this.server.on('connection', function (stream) { // 当this.server被连接时触发
        var socket = MqttCon(stream);
        socket['id'] = curId++;

        socket.on('connect', function (pkg) { // socket对connect进行监听
            socket.connack({ // ack握手
                returnCode: 0
            });
        });

        socket.on('publish', function (pkg) { // socket对pulish进行监听
            var topic = pkg.topic;
            var msg = pkg.payload.toString();
            msg = JSON.parse(msg);
            socket.emit(topic, msg); // 根据topic进行相应的触发提交
        });

        socket.on('pingreq', function () {
            socket.pingresp();
        });

        socket.send = function (topic, msg) { // 发送消息
            socket.publish({ // 调用socket.publish,发布消息,--> mqttClient on('publish')
                topic: topic,
                payload: JSON.stringify(msg)
            });
        };

        self.emit('connection', socket); // 触发masterAgent的connection监听
    });
};

由此可以看出,此处为master服务的最底层,初始化this.server,监听port端口,监听connection字段,并为socket提供一系列的监听:connect、publish、pingreq,以及send方法,分别提供连接握手,发布消息,发送消息,已经触发上层的connection监听。

四、monitor/monitor的start执行步骤

1.在初始化monitor.js时对monitor/monitor.js进行初始化操作。

var Monitor = function (app, opts) {
    opts = opts || {};
    this.app = app;
    this.serverInfo = app.getCurServer();
    this.masterInfo = app.getMaster();
    this.modules = [];
    this.closeWatcher = opts.closeWatcher;

    this.monitorConsole = admin.createMonitorConsole({
        id: this.serverInfo.id,
        type: this.app.getServerType(),
        host: this.masterInfo.host,
        port: this.masterInfo.port,
        info: this.serverInfo,
        env: this.app.get(Constants.RESERVED.ENV),
        authServer: app.get('adminAuthServerMonitor') // auth server function
    });
};

对this.monitorConsole进行赋值,调用require(‘pomelo-admin’);的createMonitorConsole方法,即为pomelo-admin/lib/consoleService.js,此处的monitorConsole为pomelo-admin的上层入口,也是上层代理,通过monitorConsole调用pomelo-admin中的方法。

2.monitor/monitor.js中的start方法。

Monitor.prototype.start = function (cb) {
    moduleUtil.registerDefaultModules(false, this.app, this.closeWatcher);
    this.startConsole(cb);
};

Monitor.prototype.startConsole = function (cb) {
    moduleUtil.loadModules(this, this.monitorConsole);

    var self = this;
    this.monitorConsole.start(function (err) {
        if (err) {
            utils.invokeCallback(cb, err);
            return;
        }
        moduleUtil.startModules(self.modules, function (err) {
            utils.invokeCallback(cb, err);
            return;
        });
    });

    this.monitorConsole.on('error', function (err) {
        if (!!err) {
            logger.error('monitorConsole encounters with error: %j', err.stack);
            return;
        }
    });
};

调用moduleUtil.registerDefaultModules,因为为monitor进程,所以此时要注册require('../modules/monitorwatcher')this.set(Constants.KEYWORDS.MODULE)

app.registerAdmin(require('../modules/monitorwatcher'), {app: app});

其余的如masterwatcher、admin.modules.watchServer等,在master进程启动时已经加载过了。
调用moduleUtil.loadModules(this, this.monitorConsole);把相应的moduleId和module注册到consoleService中,并初始化,然后把module保存到monitor的modules中,注意:monitor要比master多一个monitorwatcher。

3. 调用this.monitorConsole.start方法

以上加载操作完成后,则调用this.monitorConsole.start方法。

4.调用moduleUtil.startModules方法

在master时,这个调用没有什么有效执行,但是在monitor中比master多了一个monitorwatcher,在这个对象里面有一个start方法,后续这里有有效执行。

五、monitor——pomelo-admin/consoleService.js

1.初始化操作的顺序

monitor/monitor.js中对this.monitor进行赋值操作,this.monitorConsole = admin.createMonitorConsole(opts);同时初始化consoleService,在初始化consoleService的同时,对consoleService 的 this.agent = new MonitorAgent(this, opts); 进行初始化操作。

即:
monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent
所以,在调用monitor的start时,实则是在调用consoleService的start方法。

ConsoleService.prototype.start = function (cb) {
   	this.agent.connect(this.port, this.host, cb);
    exportEvent(this, this.agent, 'close');
   
    exportEvent(this, this.agent, 'error');

    for (var mid in this.modules) {
        this.enable(mid);
    }
};

而consoleService.start又在调用this.agent.connect的方法,即为monitorAgent.js的connect方法。

2.MonitorAgent.connect方法

MonitorAgent.prototype.connect = function (port, host, cb) {

    this.socket = new MqttClient(this.opts); // 初始化mqttClient对象
    this.socket.connect(host, port); // 调用mqttClient的connect方法

    var self = this;
    this.socket.on('register', function (msg) { // this.socket监听register,改变状态
        if (msg && msg.code === protocol.PRO_OK) {
            self.state = ST_REGISTERED;
            cb();
        }
    });

    this.socket.on('monitor', function (msg) { // this.socket监听monitor

        msg = protocol.parse(msg);

        if (msg.command) { // 调用相应command命令
            // a command from master
            self.consoleService.command(msg.command, msg.moduleId, msg.body, function (err, res) {
                //notify should not have a callback
            });
        } else {
            var respId = msg.respId;
            if (respId) { // 调用cb返回数据
                // a response from monitor
                var respCb = self.callbacks[respId];
                delete self.callbacks[respId];
                respCb(msg.error, msg.body);
                return;
            }

            // request from master
            self.consoleService.execute(msg.moduleId, 'monitorHandler', msg.body, function (err, res) {
                if (protocol.isRequest(msg)) {
                    var resp = protocol.composeResponse(msg, err, res);
                    if (resp) {
                        self.doSend('monitor', resp);
                    }
                }
            });
        }
    });

    this.socket.on('connect', function () { // this.socket监听connect
    
        self.state = ST_CONNECTED;
        var req = {
            id: self.id,
            type: 'monitor',
            serverType: self.type,
            pid: process.pid,
            info: self.info
        };
        var authServer = self.consoleService.authServer;
        var env = self.consoleService.env;
        authServer(req, env, function (token) {
            req['token'] = token;
            self.doSend('register', req); // 发送注册字段
        });
    });

    this.socket.on('reconnect_ok', function (msg) {
        if (msg && msg.code === protocol.PRO_OK) {
            self.state = ST_REGISTERED;
        }
    });
};

首先初始化this.socket = new MqttClient(this.opts);然后又调用了MqttClient的connect方法:this.socket.connect(host, port);
从这可以看出,所以这个链路又成了:
monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent——>mqttClient
this.socket对register、monitor、connect进行监听,当触发时,进行相应操作。

3.mqttClient.connect方法

MqttClient.prototype.connect = function (host, port, cb) {

    var self = this;
    this.closed = false;

    var stream = net.createConnection(this.port, this.host); // 创建简介
    this.socket = MqttCon(stream);

    this.socket.connect({ // 调用socket.connect
        clientId: this.clientId
    });

    this.socket.on('connack', function () { // 监听ack握手
        self.connected = true;

        self.setupKeepAlive();

        if (self.connectedTimes++ == 1) {
            self.emit('connect'); // 触发connect
            cb();
        } else {
            self.emit('reconnect');
        }
    });

    this.socket.on('publish', function (pkg) { // 监听publish字段
        var topic = pkg.topic;
        var msg = pkg.payload.toString();
        msg = JSON.parse(msg);

        self.emit(topic, msg); // 触发相应的topic监听 --> monitorAgent on('topic')
    });

    this.socket.on('pingresp', function () {
        self.lastPong = Date.now();
    });
}

由此可以看出,此处为monitor服务的最底层,初始化this.socket,调用this.socket.connect方法
监听connack字段,触发connect的监听,监听publish字段,触发相应的topic监听

六、master与monitor

1.层级

master的层级为:

master——>/master/master(this.masterConsole)——>consoleService(this.agent)——>masterAgent——>mqttServer

monitor的层级为:

monitor——>/monitor/monitor(this.monitorConsole)——>consoleService(this.agent)——>monitorAgent——>mqttClient

根据层级,我们可以看出,master和monitor都是创建了一个代理,提供所需功能,masterAgent和monitorAgent是一个层级,mqttServer和mqttClient是一个层级,他们之间的通信都是基于mqttServer和mqttClient所实现的。

2.master与monitor启动

a: 在master启动时,mqttServer创建了net.server服务,进行监听。
b: 在monitor启动时,mqttClient创建连接触发mqttServer中的self.emit('connection', socket);进而触发masterAgent中的connection监听,为socket创建更多的监听。
c: 进行ACK握手,self.emit('connect');触发monitorAgent的connect监听。
d: monitorAgent的connect监听,执行self.doSend('register', req);

MonitorAgent.prototype.doSend = function (topic, msg) {
    this.socket.send(topic, msg);
}
MqttClient.prototype.send = function (topic, msg) {
    this.socket.publish({ // --> mqttServer on('publish')
        topic: topic,
        payload: JSON.stringify(msg)
    })
}
MqttServer.js
socket.on('publish', function (pkg) {
    var topic = pkg.topic;
    var msg = pkg.payload.toString();
    msg = JSON.parse(msg);

    socket.emit(topic, msg); // --> masterAgent on('topic')
});

e: monitorAgent的doSend方法,实则是在调用mqttClient中的send方法,this.socket.publish()从而触发mqttServer中的socket.on('publish')监听,最终触发了masterAgent响应的topic监听。
monitorAgent.doSend——>masterAgent 的topic监听。

MasterSocket.prototype.onRegister = function (msg) {
    var self = this;
    var serverId = msg.id;
    var serverType = msg.type;
    var socket = this.socket;
	 // 根据类型判断
    if (serverType == Constants.TYPE_CLIENT) {
        this.id = serverId;
        this.type = serverType;
        this.info = 'client';
        this.agent.doAuthUser(msg, socket, function (err) {
            self.username = msg.username;
            self.registered = true;
        });
        return;
    } // end of if(serverType === 'client')

    if (serverType == Constants.TYPE_MONITOR) {
        this.id = serverId;
        this.type = msg.serverType;
        this.info = msg.info;
        this.agent.doAuthServer(msg, socket, function (err) { //验证服务器权限
            self.registered = true;
        });

        this.repushQosMessage(serverId);
        return;
    } // end of if(serverType === 'monitor') 

    this.agent.doSend(socket, 'register', {
        code: protocol.PRO_FAIL,
        msg: 'unknown auth master type'
    });

    socket.disconnect();
}
MasterAgent.prototype.doAuthServer = function (msg, socket, cb) {
    var self = this;
    var authServer = self.consoleService.authServer;
    var env = self.consoleService.env;
    authServer(msg, env, function (status) {
        var record = addConnection(self, msg.id, msg.serverType, msg.pid, msg.info, socket); // 保存连接信息(pid, type,socket等)

        doSend(socket, 'register', { // 调用mqttServer中的socket.send
            code: protocol.PRO_OK,
            msg: 'ok'
        });
        msg.info = msg.info || {}
        msg.info.pid = msg.pid;
        self.emit('register', msg.info); // --> master on('register')
        cb(null);
    });
};

f: 这次register的操作,对于monitorAgent改变了的state状态,对于masterAgent在masterSocket中进行了赋值操作。

3.monitor启动完成后

当monitor启动完成后,调用了moduleUtil.startModules方法,实际有效调用为monitorwatcher中的start方法。

var subscribeRequest = function (self, agent, id, cb) {
    var msg = {action: 'subscribe', id: id};
    // 此方法最终调到MASTER_WATCHER模块的subscribe方法
    agent.request(Constants.KEYWORDS.MASTER_WATCHER, msg, function (err, servers) { // --> monitorAgent.request
        var res = [];
        for (var id in servers) {
            res.push(servers[id]);
        }
        addServers(self, res);
        utils.invokeCallback(cb);
    });
};
MonitorAgent.prototype.request = function (moduleId, msg, cb) {
    var reqId = this.reqId++;
    this.callbacks[reqId] = cb;
    this.doSend('monitor', protocol.composeRequest(reqId, moduleId, msg)); // --> masterAgent on('monitor')
};
MasterSocket.prototype.onMonitor = function (msg) {
    var socket = this.socket;

    var self = this;
    var type = this.type;
    msg = protocol.parse(msg);
    var respId = msg.respId;
    if (respId) {
        // a response from monitor
        var cb = self.agent.callbacks[respId];

        var id = this.id;
        if (self.agent.msgMap[id]) {
            delete self.agent.msgMap[id][respId];
        }
        delete self.agent.callbacks[respId];
        return cb(msg.error, msg.body);
    }

    // a request or a notify from monitor
    self.agent.consoleService.execute(msg.moduleId, 'masterHandler', msg.body, function (err, res) { // 调用相应moduleId模块的masterHandler方法
        if (protocol.isRequest(msg)) {
            var resp = protocol.composeResponse(msg, err, res);
            if (resp) {
                self.agent.doSend(socket, 'monitor', resp);
            }
        }
    });
}
Module.prototype.masterHandler = function (agent, msg, cb) {
    var func = masterMethods[msg.action];
    func(this, agent, msg, cb);
};
var subscribe = function (module, agent, msg, cb) {
    module.watchdog.subscribe(msg.id);
    utils.invokeCallback(cb, null, module.watchdog.query());
};

monitorwatcher中的start方法经过这一系列的操作,最终在watchdog中进行订阅。

七、afterStart

在start启动完成之后,后续调用afterStart方法

Application.afterStart = function (cb) {

    var afterFun = this.lifecycleCbs[Constants.LIFECYCLE.AFTER_STARTUP];
    var self = this;
    appUtil.optComponents(this.loaded, Constants.RESERVED.AFTER_START, function (err) {
        self.state = STATE_STARTED;
        var id = self.getServerId();
        if (!!afterFun) {
            afterFun.call(null, self, function () {
                utils.invokeCallback(cb, err);
            });
        } else {
            utils.invokeCallback(cb, err);
        }
        var usedTime = Date.now() - self.startTime;
        self.event.emit(events.START_SERVER, id); // 触发monitorwatcher中的监听
    });
};

此时的appUtil.optComponents(this.loaded, Constants.RESERVED.AFTER_START)没有有效调用,因为此时的master和monitor中没有afterStart方法。
对于self.event.emit(events.START_SERVER, id);
a: 先调用了monitorwatcher中的监听
b: 调用monitorAgent中的notify
c: 调用mqttClient中的send
d: 触发mqttServer中的publish监听
e: 触发masterAgent中的monitor监听
f: 调用masterwatcher中的record方法
g: 调用watchdog中的record方法
根据服务器数量,判断所有服务器是否全部启动完成,进而调用startOver。

 类似资料: