接着前篇关于 egg-core 源码分析的文章 Egg 源码分析之egg-core ,今天来看一下 egg-cluster 的源码实现逻辑。
NodeJs 中 javascript 的执行是单线程的,所以一个进程只能使用一个 CPU,为了最大可能的使用服务器资源,一般我们可以使用下面三种方式实现:
egg-cluster 是 Egg 的多进程模型的启动模式,在使用 cluster 模式启动 Egg 应用时,我们只需要配置相关启动参数, egg-cluster 会自动创建相关进程,并管理各个进程之间的通信以及异常处理等问题。主进程 Master 通过 child_process 模块的 fork 函数创建 Agent 子进程,并通过 cluster 模块创建 Worker 子进程。Master/Agent/Worker 三者各司其职,共同保证 Egg 应用的正常运行:
Master 进程只有一个且第一个启动,主要负责进程管理的工作,包括 Worker、Agent 进程的初始化和重启以及进程之间的通信工作。Master 不运行任何业务代码,它的稳定性特别重要,一旦挂掉整个 Node 服务就挂掉了;
Agent 进程也只有一个,一般在业务开发时我们不太会用到 Agent 进程,它的用处主要有两方面:(1)如果想让你的代码只在一个进程上运行(2)Agent 进程可以将某个消息同时广播到所有 Worker 进程进行处理;
Worker 进程根据用户自己的设定可以有多个,主要负责处理业务逻辑和用户的请求。当 Worker 进程异常退出时,Master 进程会重启一个新的 Worker 进程;
Agent 子进程是 Egg.Agent 类的实例,Worker 子进程是 Egg.Application 的实例,而 Egg.Agent 和 Egg.Application 都是 EggApplication 的子类,而 EggApplication 类又是 EggCore 的子类,关于 EggCore 的源码实现可以看一下我前面的文章 Egg 源码分析之egg-core,所以类与实例之间的关系图如下:
+--------------+ 实例 +--------------+
| Agent 子进程 | --------> | Agent 类 |
+--------------+ +---------------+
/ \
child_process.fork / \ 继承
/ \
+---------------+ +-------------------+ 继承 +------------+
| Master 主进程 | | EggApplication 类 | ------> | EggCore 类 |
+-------------- + +------------------ + +------------+
\ /
\ / 继承
cluster.fork \ /
+---------------+ 实例 +----------------+
| Worker 子进程 | -------> | Application 类 |
+---------------+ +-----------------+
egg-cluster 整个模块的入口是 master.js,它的初始化流程如下:
// egg-cluster 源码 -> 启动流程(为了更容易看清楚初始化流程,constructor函数中有些代码先后顺序做了调整)
// Master 继承了 EventEmitter模块,通过事件的监听和订阅方式,非常方便的进行事务处理
class Master extends EventEmitter {
constructor(options) {
super();
//步骤1
this.workerManager = new Manager(); // workManager 是一个进程管理的工具,记录当前各个进程的状态信息
this.messenger = new Messenger(this); // messenger 主要负责进程之间的通信工作
//步骤2:自动探测及获取可用的 clusterPort
detectPort((err, port) => {
this.options.clusterPort = port;
//步骤3. 启动 Agent 子进程
this.forkAgentWorker();
});
//步骤4. 启动 Worker 子进程
this.once('agent-start', this.forkAppWorkers.bind(this));
//步骤5:通知启动完毕,实时监测子进程服务状态
this.ready(() => {
this.isStarted = true;
const action = 'egg-ready';
//通过 messenger 的 send 函数通知服务已经启动就绪
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
//使用 workerManager.startCheck 函数定时监控进程状态
if (this.isProduction) {
this.workerManager.startCheck();
}
});
}
}
manager 实现比较简单,主要通过两个属性 workers 和 agent 来维护进程的状态信息,提供了多个函数用于获取,删除,设置相关进程。这里主要看一下监听进程存活状态的 startCheck 函数的实现:
// egg-cluster 源码 -> startCheck实现
class Manager extends EventEmitter {
startCheck() {
this.exception = 0;
// 每隔 10s 钟监听 Worker 和 Agent 的数量
this.timer = setInterval(() => {
const count = this.count();
// Agent 只有一个且必须处于存活状态,Worker 至少要有一个处于存活状态服务才可用
if (count.agent && count.worker) {
this.exception = 0;
return;
}
this.exception++;
// 如果连续三次检查发现服务都不可用就触发 exception 事件,master.workerManager 监听到该事件后会退出服务
if (this.exception >= 3) {
this.emit('exception', count);
clearInterval(this.timer);
}
}, 10000);
}
}
Worker 子进程与 Agent 子进程都可以通过 IPC 与 Master 进程进行通信,而 Worker 子进程与 Agent 子进程之间是无法直接通信的,必须通过 Master 进程作为中间桥梁进行通信。每个 Master/Agent/Worker 进程都有一个 messenger 实例,该 messenger 实例用于管理与其它进程的通信工作。
这里需要注意的是 Master.messenger 实例对应的 Messenger 类的定义是在 egg-cluster 源码 中,而 Agent.messenger 和 Worker.messenger 实例对应的 Messenger 类的定义是在 egg 源码 中。前者定义了主进程如何给子进程发送消息,而后者定义了子进程如何给父进程发送消息,而两者都基于了另外一个模块 sendmessage 的基础上抽象实现的。这里有点奇怪的既然 sendmessage 函数已经兼容了 Master/Agent/Worker 三者之间的通信方式的区别,为什么没有把两个 Messenger 类放在一个模块里实现,有些许重复的代码。下面看一下 sendmessage 函数的实现部分:
// egg-cluster 源码 -> sendmessage 函数实现
module.exports = function send(child, message) {
// 如果没有send函数,说明 child 是一个 Master 进程,直接 emit 一个 message 事件给 child 进程本身
if (typeof child.send !== 'function') {
return setImmediate(child.emit.bind(child, 'message', message));
}
// Agent 子进程通过 child_process.fork() 函数创建出来的
// Worker 子进程通过 cluster.fork() 函数创建出来的,而 cluster.fork() 函数又会调用 child_process.fork(),将其返回对象绑定在 worker.process 上
// 所以 Worker 子进程对应的是 child.process.connected,而 Agent 子进程对应的是 child.connected
var connected = child.process ? child.process.connected : child.connected;
// 通过子进程的 send 函数向父进程发送消息
if (connected) {
return child.send(message);
}
};
当 Agent 收到一个消息后,虽然一定是 Master 进程发过来的,但是我们无法确定这个消息的传输过程是 Master -> Agent 还是 Worker -> Master -> Agent,所以为了标志消息从哪里来到哪里去,一个消息体里主要包含了以下几个字段:
Messenger 中的函数实现都比较简单,这里主要列一下它们分别提供了哪些函数。
从前面的分析我们知道,如果进程间只采用 IPC 进行通信,那么 Agent 和 Worker 之间通信必须通过 Master 作中转,尤其是在客户端与服务端有长连接的情况下,如果我们通过 Agent 进程与客户端建立长连接,然后 Agent 再与 Worker 建立长连接,那么比起客户端直接与 Worker 建立长连接,连接个数可以减少 N(Worker 个数)倍。为此 Egg 提供了 Agent 与 Worker 之间直接进行长连接的渠道,采用 Leader/Follower 模式,Agent(Leader)负责与远程客户端维持长连接,而 Worker 与 Agent 之间的长连接通信通过”订阅/发布“的模式使开发非常简单,具体的实现步骤如下:
+-------+
| start |
+---+---+
|
+--------+---------+
__| port competition |__
win / +------------------+ \ lose
/ \
+---------------+ tcp conn +-------------------+
| Leader(Agent) |<---------------->| Follower(Worker1) |
+---------------+ +-------------------+
| \ tcp conn
| \
+--------+ +-------------------+
| Client | | Follower(Worker2) |
+--------+ +-------------------+
// egg-cluster 源码 -> detectPort函数
module.exports = (port, callback) => {
let maxPort = port + 10;
if (typeof callback === 'function') {
// tryListen 用于探测 port 到 maxPort 之间的可用的端口号
return tryListen(port, maxPort, callback);
}
};
// listen 函数 使用 net 模块建立连接来测试端口可用性,
// net.Server().listen(port) 函数在 port=0 可以随便分配一个可用的端口
function listen(port, hostname, callback) {
const server = new net.Server();
server.on('error', err => {
server.close();
// 忽略 "ENOTFOUND" 报错,表示端口仍然可用
if (err.code === 'ENOTFOUND') {
return callback(null, port);
}
return callback(err);
});
server.listen(port, hostname, () => {
port = server.address().port;
server.close();
return callback(null, port);
});
}
// tryListen 函数会调用 listen 函数,
//拿到可用端口后检查该端口对应 hostname 为 '0.0.0.0','localhost',ip 时是否都可以使用,否则调用 handleError 继续进行探测
function tryListen(port, maxPort, callback) {
function handleError() {
tryListen(port, maxPort, callback);
}
listen(port, null, (err, realPort) => {
listen(port, '0.0.0.0', err => {
if (err) {
return handleError(err);
}
listen(port, 'localhost', err => {
if (err && err.code !== 'EADDRNOTAVAIL') {
return handleError(err);
}
listen(port, address.ip(), (err, realPort) => {
if (err) {
return handleError(err);
}
callback(null, realPort);
});
});
});
});
}
上述两步的代码不是写在 egg-cluster 模块里的,而是在 EggApplication 类的定义中实现的,这里和上面讲到的 Messenger 类的实现一样有点绕,因为同样的逻辑写在了两个地方,源码如下:
// egg-core源码 -> Agent/Worker 的 cluster 属性初始化
const cluster = require('cluster-client');
class EggApplication extends EggCore {
constructor(options) {
this.cluster = (clientClass, options) => {
options = Object.assign({}, this.config.clusterClient, options, {
port: this.options.clusterPort, //将在上一步 detectPort 中获取到的 clusterPort 传入
isLeader: this.type === 'agent' //指定 Agent 进程为 leader, Worker 进程为 follower
});
const client = cluster(clientClass, options);
this._patchClusterClient(client);
return client;
};
}
}
在拿到 clusterPort 后,egg-cluster 立即调用 forkAgentWorker 函数启动 Agent 子进程:
// egg-cluster 源码 -> forkAgentWorker
forkAgentWorker() {
// Agent 进程参数初始化
const args = [ JSON.stringify(this.options) ];
//通过 childprocess 模块的 fork 函数创建 Agent 子进程
const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
this.workerManager.setAgent(agentWorker); //注册到 workerManager 中
//监听 Agent 子进程消息,并通过 this.messenger.send 进行转发
agentWorker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'agent';
this.messenger.send(msg);
});
//前面我们讲过服务中的 Agent 进程必须处于存活状态,所以当Agent 监听到异常消息进行日志记录,但不退出
agentWorker.on('error', err => {
err.name = 'AgentWorkerError';
err.id = agentWorker.id;
err.pid = agentWorker.pid;
this.logger.error(err);
});
//异常情况下退出时,通知 Master 进程
agentWorker.once('exit', (code, signal) => {
this.messenger.send({
action: 'agent-exit',
data: { code, signal },
to: 'master',
from: 'agent',
});
});
}
forkAgentWorker 函数只是在 Master 进程中 fork 出子进程,而子进程真正的运行及初始化工作是在 agent_worker.js 中实现的,这里使用了 graceful-process 这个模块优雅的退出进程,需要等待所有已有连接的关闭后再退出进程,但不会接收新的连接,同样 app_worker.js 中也会使用该函数。新建子进程代码如下:
// egg-cluster -> agent_worker.js 新建 agent 子进程
const gracefulExit = require('graceful-process');
const options = JSON.parse(process.argv[2]);
//根据 Egg 框架 export 出的 Agent 类,新建 Agent 实例
const Agent = require(options.framework).Agent;
const agent = new Agent(options);
agent.ready(err => {
if (err) return;
agent.removeListener('error', startErrorHandler);
//告诉主进程 agent 进程启动完毕,主进程收到消息后,紧接着步骤4:启动 Worker 子进程
process.send({ action: 'agent-start', to: 'master' });
});
//设置优雅的退出进程方式
gracefulExit({
logger: consoleLogger,
label: 'agent_worker',
beforeExit: () => agent.close(),
});
Worker 的启动基本与 Agent 的启动流程一致,主要有以下两点区别:
需要注意的是在创建子进程的时候使用 cfork 这个模块,它是对 cluster.fork 的一个封装,用于管理创建多个子进程以及退出时重新创建子进程的工作,forkAppWorkers源码如下:
// egg-cluser 源码 -> forkAppWorkers 函数
const cfork = require('cfork');
forkAppWorkers() {
const args = [ JSON.stringify(this.options) ];
//通过 cfork 函数创建多个 cluster
cfork({
exec: this.getAppWorkerFile(), // app_worker.js
args,
count: this.options.workers, //子进程个数
refork: this.isProduction, //退出时是否 refork
});
//监听 fork 事件
cluster.on('fork', worker => {
this.workerManager.setWorker(worker); //将子进程注册到 workerManager 中
worker.on('message', msg => { //通知 Master 主进程某个子进程已经创建好了
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'app';
this.messenger.send(msg);
});
});
}
接下来我们看一下 app_worker.js 中是如何初始化 worker 实例以及启动 http 服务的,这里需要区别 sticky 模式和非 sticky 模式:
// egg-cluster 源码 -> app_worker.js 新建 worker 子进程
//新建 Egg.Application 实例
const options = JSON.parse(process.argv[2]);
const Application = require(options.framework).Application;
const app = new Application(options);
//获取服务对外端口,并告知 Master
process.send({ to: 'master', action: 'realport', data: port });
//等待一切 ready 好以后启动服务
app.ready(startServer);
// 启动 http 服务
function startServer(err) {
let server;
// 针对 https 和 http 的不同进行处理
if (options.https) {
server = require('https').createServer(httpsOptions, app.callback());
} else {
server = require('http').createServer(app.callback());
}
if (options.sticky) {
// sticky 模式下,每个 Worker 都会随机选择一个端口启动服务,这个服务用于接收 Master 服务转发过来的 connection 并进行处理
server.listen(0, '127.0.0.1');
// Master 服务只会发送sticky-session:connection消息
process.on('message', (message, connection) => {
if (message !== 'sticky-session:connection') {
return;
}
server.emit('connection', connection);
connection.resume();
});
} else {
//非 sticky 情况下监听对外端口,启动服务
if (listenConfig.path) {
server.listen(listenConfig.path);
} else {
const args = [ port ];
if (listenConfig.hostname) args.push(listenConfig.hostname);
server.listen(...args);
}
}
}
//设置优雅的退出进程方式
gracefulExit({
logger: consoleLogger,
label: 'app_worker',
beforeExit: () => app.close(),
});
在 Worker 启动后会发送 app-start 事件给 Master ,此时 Master 会执行 onAppStart 函数,这个函数除了对启动后的服务做一些初始化工作以外,最主要的一件事情就是在 sticky 模式下启动 MasterSocketServer 用于转发请求到各个 Worker,源码如下:
// egg-cluster 源码 -> onAppStart 函数和 startMasterSocketServer 函数
onAppStart(data) {
//告诉 Agent 目前启动的所有 WorkerIds
this.messenger.send({
action: 'egg-pids',
to: 'agent',
data: this.workerManager.getListeningWorkerIds(),
});
//这里如果是 sticky 模式那么会先创建一个 MasterSocketServer,然后才正式完成 Master 进程的启动,
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}
}
startMasterSocketServer(cb) {
// Master 进程创建一个 TCP 服务并监听真正的对外端口
//这里的 pauseOnConnect=true 表示不会消费和使用传入的 connection,因为这个 connection 只有真正的传递给某个 Worker 时才会被解析使用
require('net').createServer({ pauseOnConnect: true }, connection => {
//开发者必须在 Nginx 配置了 remoteAddress 相关信息才能根据用户 ip 实现 sticky 模式.
if (!connection.remoteAddress) {
connection.close();
} else {
//根据用户 ip 解析到指定的 Worker 实例,并将这个 connection 转发给这个 Worker 子进程,Worker 负责真正处理请求
const worker = this.stickyWorker(connection.remoteAddress);
worker.send('sticky-session:connection', connection);
}
}).listen(this[REALPORT], cb);
}
在启动完毕后,为了保证服务正常稳定运行,Master 主进程需要监听各种异常事件:
Agent 子进程的退出事件,对应的处理函数是onAgentExit,Agent 子进程必须存活,Agent 子进程一旦退出,意味着整个服务就需要退出或者需要重启 Agent 子进程,退出时需要 removeAllListeners 防止内存泄漏。
Worker 子进程的退出事件,对应的处理函数是onAppExit,清理 Master.manager 中对于该进程的注册消息,并 removeAllListeners,并根据不同的启动环境以及参数判断是否退出服务,生产环境会继续 fork 出新的 Worker 子进程而不退出服务
当 Master 进程收到 Ctrl-C 或者 process.exit() 退出信号时会触发这些事件,对应的处理函数是 onSignal 函数, onSignal 函数会调用 close 函数,close 函数又会调用 _doClose 函数,真正的退出操作都是在 _doClose 中实现的:
// egg-cluster 源码 -> _doClose 实现
_doClose() {
//退出所有的 Worker 子进程
try {
yield this.killAppWorkers(appTimeout);
} catch (e) {
this.logger.error('[master] app workers exit error: ', e);
}
// 退出 Agent 子进程
try {
yield this.killAgentWorker(agentTimeout);
} catch (e) {
this.logger.error('[master] agent worker exit error: ', e);
}
}
而 killAppWorkers 函数和 killAgentWorker 函数在退出各个进程时,都会调用 terminate 函数。我们知道当 kill 掉一个进程后,依赖该进程创建的子进程将成为孤儿进程,而 terminate 函数就是负责包括进程本身以及子进程的退出工作:
// egg-cluster 源码 -> terminate 函数实现
module.exports = function* (subProcess, timeout) {
const pid = subProcess.process ? subProcess.process.pid : subProcess.pid;
const childPids = yield getChildPids(pid); //获取所有子进程的 pid
yield [
killProcess(subProcess, timeout), // kill 当前进程
killChildren(childPids, timeout), // kill 所有子进程
];
};
//如果 SIGTERM 信号不能工作,则使用 SIGKILL 信号进行 kill
function* killProcess(subProcess, timeout) {
subProcess.kill('SIGTERM');
yield Promise.race([
awaitEvent(subProcess, 'exit'),
sleep(timeout),
]);
if (subProcess.killed) return;
(subProcess.process || subProcess).kill('SIGKILL');
}
//删除所有对应的子进程
function* killChildren(children, timeout) {
if (!children.length) return;
kill(children, 'SIGTERM');
const start = Date.now();
const checkInterval = 400;
let unterminated = [];
while (Date.now() - start < timeout - checkInterval) {
yield sleep(checkInterval);
unterminated = getUnterminatedProcesses(children);
if (!unterminated.length) return;
}
kill(unterminated, 'SIGKILL');
}