pomelo 框架第三部分 remote/proxy服务

姜嘉良
2023-12-01

pomelo 框架第三部分

项目地址: https://github.com/NetEase/chatofpomelo-websocket.git
分支:tutorial-protobuf
如果发现有错误的地方,请在评论中及时指出,谢谢

一、remote.js的start方法

1. remote初始化

如果此服务器配置包含port端口,则加载remote组件。
进而启动remote组件,方法如下:

pro.start = function (cb) {
    this.opts.port = this.app.getCurServer().port;
    this.remote = genRemote(this.app, this.opts); // 初始化remote
    this.remote.start();
    process.nextTick(cb);
};
var genRemote = function (app, opts) {
    opts.paths = getRemotePaths(app); // 加载不同的remote路径
    opts.context = app;
    if (!!opts.rpcServer) { // 根据是否有rpcServer配置进行不同的初始化
        return opts.rpcServer.create(opts);
    } else {
        return RemoteServer.create(opts);
    }
};
var getRemotePaths = function (app) {
    var paths = [];

    var role;
    // master server should not come here
    if (app.isFrontend()) {
        role = 'frontend';
    } else {
        role = 'backend';
    }

    var sysPath = pathUtil.getSysRemotePath(role), serverType = app.getServerType();
    if (fs.existsSync(sysPath)) {
        paths.push(pathUtil.remotePathRecord('sys', serverType, sysPath));
    }
    var userPath = pathUtil.getUserRemotePath(app.getBase(), serverType);
    if (fs.existsSync(userPath)) {
        paths.push(pathUtil.remotePathRecord('user', serverType, userPath));
    }

    return paths;
};

backend paths:

[
  {
    namespace: 'sys',
    serverType: 'chat',
    path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/backend'
  },
  {
    namespace: 'user',
    serverType: 'chat',
    path: '/chatofpomelo-websocket/game-server/app/servers/chat/remote'
  }
]

frontend paths:

[
  {
    namespace: 'sys',
    serverType: 'connector',
    path: /chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/frontend'
  }
]

通过上面的代码,我们得知,remote根据服务器是否为前端服务器进行不同的路径匹配,remote的命名空间分为sys系统级和user用户级两种规则,后续调用remoteServer.create方法为this.remote进行初始化赋值,最后调用this.remote.start()启动remote模块。

2.rpc-server/server.create

module.exports.create = function (opts) {
    var services = loadRemoteServices(opts.paths, opts.context); // 根据paths加载相应路径的模块,并进行初始化操作
    opts.services = services;
    var gateway = Gateway.create(opts); // 初始化网关
    return gateway;
};

rpc-server/server初始化操作:
a: loadRemoteServices 加载了paths中相关的模块。
b: 对于sys 级的类型,加载pomelo/lib/common/remote下的模块,backend/msgRemote.jsfrontend/channelRemote.jsfrontend/sessionRemote.js
c: 对于user 级的类型,加载相应服务下的/remote里面的自定义模块。
d: 初始化网关

3.初始化gateway

var Gateway = function (opts) {
    EventEmitter.call(this);
    this.opts = opts || {};
    this.port = opts.port || 3050;
    this.started = false;
    this.stoped = false;
    this.acceptorFactory = opts.acceptorFactory || defaultAcceptorFactory;
    this.services = opts.services;
    var dispatcher = new Dispatcher(this.services); // 初始化调度
    if (!!this.opts.reloadRemotes) {
        watchServices(this, dispatcher);
    }
    this.acceptor = this.acceptorFactory.create(opts, function (tracer, msg, cb) { // 初始化接收器
        dispatcher.route(tracer, msg, cb); // 调度路由,作为cb对接收器进行赋值cb
    });
};

初始化接收器工厂,初始化调度,对于路由来说,通过调度将消息路由到适当的服务对象。

4.调用start

remote中调用start:this.remote.start();实际上是调用gateway中的start方法。
gateway.js:

pro.start = function () {
    this.started = true;
    this.acceptor.listen(this.port);
};

gateway调用mqtt-acceptor的listen方法,创建net.Server服务,监听connection字段
mqtt-acceptor.js:

pro.listen = function (port) {
    this.inited = true;
    var self = this;
    this.server = new net.Server();
    this.server.listen(port);
    this.server.on('connection', function (stream) { // 监听connection字段
        var socket = MqttCon(stream);
        socket['id'] = curId++;

        socket.on('publish', function (pkg) { // 监听publish
            pkg = pkg.payload.toString();
            var isArray = false;
            pkg = JSON.parse(pkg);
            if (pkg instanceof Array) {
                processMsgs(socket, self, pkg);
                isArray = true;
            } else {
                processMsg(socket, self, pkg); // 发送消息
            }
        });
        self.sockets[socket.id] = socket;
    });
};

mqtt-server监听publis字段,发送消息。

var processMsg = function (socket, acceptor, pkg) {
    var tracer = null;
    acceptor.cb(tracer, pkg.msg, function () { // 调用dispatcher的route方法
        var len = arguments.length;
        var args = new Array(len);
        for (var i = 0; i < len; i++) {
            args[i] = arguments[i];
        }

        var resp = {id: pkg.id, resp: args};
        doSend(socket, resp);
    });
};

acceptor.cb为dispatcher中的route方法,在初始化acceptor(mqtt-acceptor.js)的时候赋值。
dispatcher.js:

pro.route = function (tracer, msg, cb) {
    var namespace = this.services[msg.namespace];
    var service = namespace[msg.service];
    var method = service[msg.method];
    var args = msg.args;
    args.push(cb);
    
    method.apply(service, args);
};

通过route调用相应的remote模块中的method方法
mqtt-server.js:

var doSend = function (socket, msg) {
    socket.publish({
        topic: 'rpc',
        payload: JSON.stringify(msg)
    });
}

调用socket.publish发布topic为rpc的的消息。

二、proxy.js的afterStart方法

1.初始化操作

proxy.js:

var Component = function (app, opts) {
    this.app = app;
    this.opts = opts;
    this.client = genRpcClient(this.app, opts);
    this.app.event.on(events.ADD_SERVERS, this.addServers.bind(this));
};

genRpcClient初始化rpc-client为this.client进行赋值操作。
在proxy初始化的过程中创建监听this.app.event.on(events.ADD_SERVERS, this.addServers.bind(this));

rpc-client/clinet:

var Client = function (opts) {
    opts = opts || {};
    this._context = opts.context;
    this._routeContext = opts.routeContext;
    this.router = opts.router || router.df;
    this.routerType = opts.routerType;
    this.rpcDebugLog = opts.rpcDebugLog;
    if (this._context) {
        opts.clientId = this._context.serverId;
    }
    this.opts = opts;
    this.proxies = {};
    this._station = createStation(opts); //初始化MailStation服务
    this.state = STATE_INITED;
};

初始化pomelo-rpc/client。

2.app添加服务器

moduleUtil.startModules(self.modules, function (err) {
    utils.invokeCallback(cb, err);
    return;
});

在monitor服务启动完成后调用moduleUtil.startModules,其中有效操作为monitorwatcher中的start方法,调用subscribeRequest,从而调用app.addServers,添加服务器。

Application.addServers = function (servers) {
    var item, slist;
    for (var i = 0, l = servers.length; i < l; i++) {
        item = servers[i];
        this.servers[item.id] = item;

        slist = this.serverTypeMaps[item.serverType];
        if (!slist) {
            this.serverTypeMaps[item.serverType] = slist = [];
        }
        replaceServer(slist, item);

        if (this.serverTypes.indexOf(item.serverType) < 0) {
            this.serverTypes.push(item.serverType);
        }
    }
    this.event.emit(events.ADD_SERVERS, servers);
};

Application.addServers方法添加服务器,进而触发events.ADD_SERVERS,proxy中的this.addServers.bind(this);

3.生成proxy服务

proxy.js:

pro.addServers = function (servers) {
    //获取remote的路径,根据路径加载并初始化相应的模块
    //根据path初始化加载模块,创建proxy,并保存到this.proxies中
    genProxies(this.client, this.app, servers);
    this.client.addServers(servers);
};

受到app.addServers的触发,调用proxy添加服务操作。

var genProxies = function (client, app, sinfos) {
    var item;
    for (var i = 0, l = sinfos.length; i < l; i++) {
        item = sinfos[i];
        if (hasProxy(client, item)) {
            continue;
        }
        //getProxyRecords获取remote的路径
        //client.addProxies根据路径加载并初始化相应的模块
        //根据path初始化加载模块,创建proxy,并保存到this.proxies中
        client.addProxies(getProxyRecords(app, item));
    }
};
getProxyRecords:
[
    {
        namespace: 'sys',
        serverType: 'chat',
        path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/backend'
    },
    {
        namespace: 'user',
        serverType: 'chat',
        path: '/chatofpomelo-websocket/game-server/app/servers/chat/remote'
    }
]

[
    {
        namespace: 'sys',
        serverType: 'connector',
        path: '/chatofpomelo-websocket/game-server/node_modules/pomelo/lib/common/remote/frontend'
    }
]

getProxyRecords获取proxy的路径,其中包含sys系统级的服务,也包含user用户级的服务,调用rpc-client中的addProxies,添加并保存服务。

pomelo-rpc/client.js:

pro.addProxies = function (records) {
    for (var i = 0, l = records.length; i < l; i++) {
        this.addProxy(records[i]);
    }
};
pro.addProxy = function (record) {
    // generateProxy,根据path初始化加载模块,并创建proxy
    // 根据route分配serverId,然后调用client.rpcInvoke
    var proxy = generateProxy(this, record, this._context);
    // 把代理添加到this.proxies中
    insertProxy(this.proxies, record.namespace, record.serverType, proxy);
};

生成proxy,并将proxy保存到this.proxies中。

var generateProxy = function (client, record, context) {
    var res, name;
    var modules = Loader.load(record.path, context);
    if (modules) {
        res = {};
        for (name in modules) {
            res[name] = Proxy.create({
                service: name,
                origin: modules[name],
                attach: record,
                // proxyCB根据route分配serverId,然后调用client.rpcInvoke
                proxyCB: proxyCB.bind(null, client)
            });
        }
    }
    return res;
};

generateProxy根据path信息初始化加载模块,并创建proxy。

var proxyCB = function (client, serviceName, methodName, args, attach, isToSpecifiedServer) {
    var routeParam = args.shift();
    var cb = args.pop();
    var serverType = attach.serverType;
    var msg = {
        namespace: attach.namespace,
        serverType: serverType,
        service: serviceName,
        method: methodName,
        args: args
    };

    if (isToSpecifiedServer) {
        rpcToSpecifiedServer(client, msg, serverType, routeParam, cb);
    } else {
        // app.js中自定义的route,根据route规则返回serverId
        getRouteTarget(client, serverType, msg, routeParam, function (err, serverId) {
            // 调用this._station.dispatch方法 ——> mailbox.send ——> socket.publish({topic: 'rpc',payload: msg});
            client.rpcInvoke(serverId, msg, cb);
        });
    }
};

对于proxyCB方法,其中包含了getRouteTarget,此方法是用来获取目标路由的,其规则为app.js中自定义的route规则,或者为默认规则,并将分配的服务器serverId返回来。
最后根据serverId调用client.rpcInvoke发送数据。

以上操作为genProxies(this.client, this.app, servers);的作用,创建proxy,并保存到this.proxies中。

4.MailStation

pomelo-rpc/client.js:

pro.addServer = function (server) {
    this._station.addServer(server);
};

mailstation.js:

pro.addServer = function (serverInfo) {
    var id = serverInfo.id;
    var type = serverInfo.serverType;
    this.servers[id] = serverInfo;
    this.onlines[id] = 1;

    if (!this.serversMap[type]) {
        this.serversMap[type] = [];
    }

    if (this.serversMap[type].indexOf(id) < 0) {
        this.serversMap[type].push(id);
    }
    this.emit('addServer', id);
};

this.client.addServers(servers);在mailstation服务中根据服务器信息保存服务器数据到mailstation.serversMap中。

5.proxy的afterStart方法

pro.afterStart = function (cb) {
    var self = this;
    this.app.__defineGetter__('rpc', function () { // 绑定rpc属性
        return self.client.proxies.user;
    });
    this.app.__defineGetter__('sysrpc', function () { // 绑定sysrpc属性
        return self.client.proxies.sys;
    });
    this.app.set('rpcInvoke', this.client.rpcInvoke.bind(this.client), true); // 设置rpcInvoke属性
    this.client.start(cb);
};

在app上绑定rpc、sysrpc、rpcInvoke属性,以及对应的调用。
根据此可以看出来,rpc分为三种模式,分别为rpc对应user,sysrpc对应sys,以及rpcInvoke。
根据上面第3点的内容,我们可以看出,rpc的最终指向都为this.client.rpcInvoke方法。

6.rpcInvoke方法

pomelo-rpc/client.js:

pro.rpcInvoke = function (serverId, msg, cb) {
    var tracer = null;
    this._station.dispatch(tracer, serverId, msg, this.opts, cb);
};

调用mailstation中的dispatch方法
mailstation.js

pro.dispatch = function (tracer, serverId, msg, opts, cb) {
    var self = this;
    var mailbox = this.mailboxes[serverId];
    if (!mailbox) {
        if (!lazyConnect(tracer, this, serverId, this.mailboxFactory, cb)) {
            self.emit('error', constants.RPC_ERROR.NO_TRAGET_SERVER, tracer, serverId, msg, opts);
        }
        addToPending(tracer, this, serverId, arguments);
        return;
    }

    var send = function (tracer, err, serverId, msg, opts) {
        var mailbox = self.mailboxes[serverId];
        mailbox.send(tracer, msg, opts, function (tracer_send, send_err, args) {
            doFilter(tracer_send, null, serverId, msg, opts, self.afters, 0, 'after', function (tracer, err, serverId, msg, opts) {
                utils.applyCallback(cb, args);
            });
        });
    };

    doFilter(tracer, null, serverId, msg, opts, this.befores, 0, 'before', send);
};

如果在this.mailboxes中没有此服务器信息,则通过懒加载的进行创建连接并保存到this.mailboxes中,然后通过过滤器过滤,调用mailbox中的send方法

var lazyConnect = function (tracer, station, serverId, factory, cb) {
    var server = station.servers[serverId];
    var online = station.onlines[serverId];
    var mailbox = factory.create(server, station.opts);
    station.connecting[serverId] = true;
    station.mailboxes[serverId] = mailbox;
    station.connect(tracer, serverId, cb);
    return true;
};

为rpc服务创建connect连接,用以连接remote服务。
mqtt-mailbox.js:

MailBox.prototype.connect = function (tracer, cb) {
    var self = this;

    var stream = net.createConnection(this.port, this.host);
    this.socket = MqttCon(stream);

    this.socket.connect({
        clientId: 'MQTT_RPC_' + Date.now()
    }, function () {
        self.connected = true;
        if (self.bufferMsg) {
            self._interval = setInterval(function () {
                flush(self);
            }, self.interval);
        }
        self.setupKeepAlive();
    });

    this.socket.on('publish', function (pkg) {
        if (pkg.topic == Constants['TOPIC_HANDSHAKE']) {
            upgradeHandshake(self, pkg.payload);
            return cb();
        }
        pkg = Coder.decodeClient(pkg.payload);
        processMsg(self, pkg);
    });
};

通过net.createConnection(this.port, this.host);与remote服务创建连接,监听publish,为publish提供processMsg方法。

MailBox.prototype.send = function (tracer, msg, opts, cb) {
    var id = this.curId++;
    this.requests[id] = cb;

    var pkg = Coder.encodeClient(id, msg, this.servicesMap);
    if (this.bufferMsg) {
        enqueue(this, pkg);
    } else {
        doSend(this.socket, pkg);
    }
};
var doSend = function (socket, msg) {
    socket.publish({
        topic: 'rpc',
        payload: msg
    });
}

通过doSend中的socket.publish发布信息,在pomelo-rpc/server端进行接收,即mqtt-acceptor.js。

三、总结remote与proxy

对于rpc服务来说,proxy为rpc-client端,remote为rpc-server端,并且提供sys系统级和user用户级的服务。

1.user级rpc服务

pomelo为用户提供了两种调用rpc的方式:
一种是:

self.app.rpc.chat.chatRemote.add(session, uid, self.app.get('serverId'), rid, true, function(users){
		next(null, { users: users });
	});

另一种是:

self.app.rpcInvoke('chat-server-1',{
        namespace : "user",
        service : "chatRemote",
        method : "add",
        args : [uid, self.app.get('serverId'), rid, true]
	}, function(users){
		next(null, { users:users })
	});

相对于第一种,第二种属于直接调用client.rpcInvoke,由前文得知,第一种的底层逻辑也是调用client.rpcInvoke

2.拆解一个完整的user级rpc服务

a: 调用Handler中的rpc方法

self.app.rpcInvoke(serverId,msg,cb);

b: 调用proxy服务中的绑定的rpcInvoke方法

在调用proxy的afterStart方法时绑定rpcInvoke服务:this.app.set('rpcInvoke', this.client.rpcInvoke.bind(this.client), true);

c: 调用rpc-clinet服务中的rpcInvoke方法

实则调用mailstation服务中的dispatch方法:this._station.dispatch(tracer, serverId, msg, this.opts, cb);

d: 调用mailstation服务中的dispatch方法

mailbox.send(tracer, msg, opts, function (tracer_send, send_err, args) {
    utils.applyCallback(cb, args);
});

调用最底层mailbox服务,实则调用mqtt-mailbox的send方法。

e: 调用mqtt-mailbox服务中的send方法

MailBox.prototype.send = function (tracer, msg, opts, cb) {
    var id = this.curId++;
    this.requests[id] = cb;
    var pkg = {id: id,msg: msg};
    doSend(this.socket, pkg);
};
var doSend = function (socket, msg) {
    socket.publish({
        topic: 'rpc',
        payload: JSON.stringify(msg)
    });
}

通过socket发布消息到指定服务器。

f: 触发mqtt-acceptor服务中的publish监听

socket.on('publish', function (pkg) {
    pkg = pkg.payload.toString();
    pkg = JSON.parse(pkg);
    processMsg(socket, self, pkg);
});
var processMsg = function (socket, acceptor, pkg) {
    var tracer = null;
    acceptor.cb(tracer, pkg.msg, function () {
        var len = arguments.length;
        var args = new Array(len);
        for (var i = 0; i < len; i++) {
            args[i] = arguments[i];
        }
        var resp = {id: pkg.id, resp: args};

        doSend(socket, resp);
    });
};

通过触发publish监听,解析rpc数据,调用processMsg方法,从而调用 acceptor.cb方法。

g: 调用acceptor.cb方法

var Acceptor = function (opts, cb) {
    this.cb = cb;
};

我们再看一下Acceptor.cb方法,我们发现此方法是在Acceptor初始化的时候赋值的。

var Gateway = function (opts) {ceptor = this.acceptorFactory.create(opts, function (tracer, msg, cb) {
        dispatcher.route(tracer, msg, cb);
    });
};

在往上查看,cb为dispatcher.route(tracer, msg, cb);方法。

h: 调用dispatcher.route方法

pro.route = function (tracer, msg, cb) {
    var namespace = this.services[msg.namespace];
    var service = namespace[msg.service];
    var method = service[msg.method];
    var args = msg.args;
    args.push(cb);
    method.apply(service, args);
};

dispatcher.route中把相应service服务中的方法绑定到了这里,即chatRomte.js中的方法在这里进行调用。

I: 最终目标,完成rpc目标方法的调用

ChatRemote.prototype.add = function(uid, sid, name, flag, cb) {
	cb();
};

上面的dispatcher.route中根据method进行方法匹配调用,进而调用相应的模块下的方法。执行完毕后cb放回结果。

j: 返回消息

以上操作,就完成了rpc调用的目的,在不同服务器间进行调用,现在又回到了mqtt-acceptor服务

var doSend = function (socket, msg) {
    socket.publish({
        topic: 'rpc',
        payload: JSON.stringify(msg)
    });
}

调用完成后,再次通过rpc目标服务器向原服务器进行socket.publish发布消息。

k: 触发mqtt-mailbox的publish监听

this.socket.on('publish', function (pkg) {
    pkg = pkg.payload.toString();
    pkg = JSON.parse(pkg);
    processMsg(self, pkg);
});
var processMsg = function (mailbox, pkg) {
    var pkgId = pkg.id;
    var cb = mailbox.requests[pkgId];
    delete mailbox.requests[pkgId];
    var tracer = null;
    var sendErr = null;
    var pkgResp = pkg.resp;
    cb(tracer, sendErr, pkgResp);
};

通过监听publish,接收rpc返回的数据,解析,并通过cb进行返回。

l: rpc结束

self.app.rpcInvoke(serverId,msg,function(){
	console.log('end');
});

以上所有为rpc完成的操作流程,最后通过回调,执行rpc后续操作。

3. 总结

经过一个完整的rpc调用流程,我们发现:
a: mqtt-mailbox与mqtt-acceptor服务分别为proxy与remote服务的最底层,进行rpc服务的最底层消息传递操作。
b: mailbox和acceptor分别为mqtt-mailbox与mqtt-acceptor服务的上一层,作为工厂方法,创建其底层实现。
c: 作为mailbox和acceptor的上一层mailstation与gateway,对mailbox和acceptor进行管理,并为上层提供统一接口。
d: 最上层为client与server层,对于client层来说,为代理层,隐藏具体rpc细节,为proxy服务提供接口,对于server层来说,为服务层,加载remote服务,提供远程服务业务逻辑实现。

 类似资料: