当前位置: 首页 > 工具软件 > egg-core > 使用案例 >

Egg 源码分析之 egg-cluster

太叔经赋
2023-12-01

接着前篇关于 egg-core 源码分析的文章 Egg 源码分析之egg-core ,今天来看一下 egg-cluster 的源码实现逻辑。

NodeJs 中 javascript 的执行是单线程的,所以一个进程只能使用一个 CPU,为了最大可能的使用服务器资源,一般我们可以使用下面三种方式实现:

  • 同一台机器上部署多个 Node 服务,使用不同的端口,然后用 Nginx 做负载均衡将请求转发到不同的 Node 实例;
  • 使用 PM2 进程管理工具,多个进程共用一个端口,PM2 负责进程重启工作;
  • 利用 Node 自带 child_processcluster 模块可以方便实现多进程之间的通信;

egg-cluster 是什么

egg-cluster 是 Egg 的多进程模型的启动模式,在使用 cluster 模式启动 Egg 应用时,我们只需要配置相关启动参数, egg-cluster 会自动创建相关进程,并管理各个进程之间的通信以及异常处理等问题。主进程 Master 通过 child_process 模块的 fork 函数创建 Agent 子进程,并通过 cluster 模块创建 Worker 子进程。Master/Agent/Worker 三者各司其职,共同保证 Egg 应用的正常运行:

  • Master:

Master 进程只有一个且第一个启动,主要负责进程管理的工作,包括 Worker、Agent 进程的初始化和重启以及进程之间的通信工作。Master 不运行任何业务代码,它的稳定性特别重要,一旦挂掉整个 Node 服务就挂掉了;

  • Agent

Agent 进程也只有一个,一般在业务开发时我们不太会用到 Agent 进程,它的用处主要有两方面:(1)如果想让你的代码只在一个进程上运行(2)Agent 进程可以将某个消息同时广播到所有 Worker 进程进行处理;

  • Worker

Worker 进程根据用户自己的设定可以有多个,主要负责处理业务逻辑和用户的请求。当 Worker 进程异常退出时,Master 进程会重启一个新的 Worker 进程;

Agent 子进程是 Egg.Agent 类的实例,Worker 子进程是 Egg.Application 的实例,而 Egg.Agent 和 Egg.Application 都是 EggApplication 的子类,而 EggApplication 类又是 EggCore 的子类,关于 EggCore 的源码实现可以看一下我前面的文章 Egg 源码分析之egg-core,所以类与实例之间的关系图如下:

                            +--------------+    实例    +--------------+
                            | Agent 子进程  | --------> | Agent 类      |
                            +--------------+           +---------------+ 
                          /                                               \
     child_process.fork /                                                   \ 继承
                      /                                                       \
         +---------------+                                                      +-------------------+    继承  +------------+
         | Master 主进程  |                                                      | EggApplication 类 | ------>  | EggCore 类 |
         +-------------- +                                                      +------------------ +          +------------+ 
                      \                                                        /  
                        \                                                     / 继承
             cluster.fork \                                                  /
                            +---------------+   实例     +----------------+
                            | Worker 子进程  | ------->  | Application 类  | 
                            +---------------+           +-----------------+ 


egg-cluster 源码分析

egg-cluster 整个模块的入口是 master.js,它的初始化流程如下:

  1. workerManager 实例和 messenger 实例的初始化
  2. 自动探测及获取可用的 clusterPort,使用 cluster-client 让 Agent 和 Worker 直接通信变为可能
  3. 启动 Agent 子进程
  4. 启动 Worker 子进程
  5. 启动完毕,实时监测各个进程服务状态
// egg-cluster 源码 -> 启动流程(为了更容易看清楚初始化流程,constructor函数中有些代码先后顺序做了调整)
// Master 继承了 EventEmitter模块,通过事件的监听和订阅方式,非常方便的进行事务处理
class Master extends EventEmitter {
  constructor(options) {
    super();
    //步骤1
    this.workerManager = new Manager();   // workManager 是一个进程管理的工具,记录当前各个进程的状态信息
    this.messenger = new Messenger(this); // messenger 主要负责进程之间的通信工作
 
    //步骤2:自动探测及获取可用的 clusterPort
    detectPort((err, port) => {
      this.options.clusterPort = port;
      //步骤3. 启动 Agent 子进程
      this.forkAgentWorker();
    });
    
    //步骤4. 启动 Worker 子进程
    this.once('agent-start', this.forkAppWorkers.bind(this));
    
    //步骤5:通知启动完毕,实时监测子进程服务状态
    this.ready(() => {
      this.isStarted = true;
      const action = 'egg-ready';
      //通过 messenger 的 send 函数通知服务已经启动就绪
      this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
      this.messenger.send({ action, to: 'app', data: this.options });
      this.messenger.send({ action, to: 'agent', data: this.options });
      //使用 workerManager.startCheck 函数定时监控进程状态
      if (this.isProduction) {
        this.workerManager.startCheck();
      }
    });
  }
}
步骤1:子进程的管理和进程间的通信(manager 和 messenger)
  • manager

manager 实现比较简单,主要通过两个属性 workers 和 agent 来维护进程的状态信息,提供了多个函数用于获取,删除,设置相关进程。这里主要看一下监听进程存活状态的 startCheck 函数的实现:

// egg-cluster 源码 -> startCheck实现
class Manager extends EventEmitter {
  startCheck() {
    this.exception = 0;
    // 每隔 10s 钟监听 Worker 和 Agent 的数量
    this.timer = setInterval(() => {
      const count = this.count();
      // Agent 只有一个且必须处于存活状态,Worker 至少要有一个处于存活状态服务才可用
      if (count.agent && count.worker) {
        this.exception = 0;
        return;
      }
      this.exception++;
      // 如果连续三次检查发现服务都不可用就触发 exception 事件,master.workerManager 监听到该事件后会退出服务
      if (this.exception >= 3) {
        this.emit('exception', count);
        clearInterval(this.timer);
      }
    }, 10000);
  }
}
  • messenger

Worker 子进程与 Agent 子进程都可以通过 IPC 与 Master 进程进行通信,而 Worker 子进程与 Agent 子进程之间是无法直接通信的,必须通过 Master 进程作为中间桥梁进行通信。每个 Master/Agent/Worker 进程都有一个 messenger 实例,该 messenger 实例用于管理与其它进程的通信工作。

这里需要注意的是 Master.messenger 实例对应的 Messenger 类的定义是在 egg-cluster 源码 中,而 Agent.messenger 和 Worker.messenger 实例对应的 Messenger 类的定义是在 egg 源码 中。前者定义了主进程如何给子进程发送消息,而后者定义了子进程如何给父进程发送消息,而两者都基于了另外一个模块 sendmessage 的基础上抽象实现的。这里有点奇怪的既然 sendmessage 函数已经兼容了 Master/Agent/Worker 三者之间的通信方式的区别,为什么没有把两个 Messenger 类放在一个模块里实现,有些许重复的代码。下面看一下 sendmessage 函数的实现部分:

// egg-cluster 源码 -> sendmessage 函数实现
module.exports = function send(child, message) {
  // 如果没有send函数,说明 child 是一个 Master 进程,直接 emit 一个 message 事件给 child 进程本身
  if (typeof child.send !== 'function') {
    return setImmediate(child.emit.bind(child, 'message', message));
  }

  // Agent 子进程通过 child_process.fork() 函数创建出来的
  // Worker 子进程通过 cluster.fork() 函数创建出来的,而 cluster.fork() 函数又会调用 child_process.fork(),将其返回对象绑定在 worker.process 上
  // 所以 Worker 子进程对应的是 child.process.connected,而 Agent 子进程对应的是 child.connected
  var connected = child.process ? child.process.connected : child.connected;
  // 通过子进程的 send 函数向父进程发送消息
  if (connected) {
    return child.send(message);
  }
};

当 Agent 收到一个消息后,虽然一定是 Master 进程发过来的,但是我们无法确定这个消息的传输过程是 Master -> Agent 还是 Worker -> Master -> Agent,所以为了标志消息从哪里来到哪里去,一个消息体里主要包含了以下几个字段:

  • from: 消息从哪里来
  • to: 消息去哪里
  • action: 消息是干什么的
  • data: 消息的具体内容

Messenger 中的函数实现都比较简单,这里主要列一下它们分别提供了哪些函数。

  1. Master.messenger 提供的函数:
  • sendToAgentWorker:发送给 Agent 进程
  • sendToAppWorker: 发送给 Worker 进程
  • sendToMaster:发送给 Master 进程自己
  • send:可以动态指定 from/to/action 进行发送
  1. Agent.messenger 和 Worker.messenger 提供的函数:
  • broadcast: 发送消息给所有的 Agent 和 Worker 包括自己
  • sendToApp: 发送给所有的 Worker
  • sendToAgent:发送给 Agent,Agent 发送给 Agent 自己本身
  • sendRandom:Agent 随机发送给一个 Worker
  • sendTo:可以指定发送给谁
步骤2:cluster-client 增强 Agent 和 Worker 进程间通信

从前面的分析我们知道,如果进程间只采用 IPC 进行通信,那么 Agent 和 Worker 之间通信必须通过 Master 作中转,尤其是在客户端与服务端有长连接的情况下,如果我们通过 Agent 进程与客户端建立长连接,然后 Agent 再与 Worker 建立长连接,那么比起客户端直接与 Worker 建立长连接,连接个数可以减少 N(Worker 个数)倍。为此 Egg 提供了 Agent 与 Worker 之间直接进行长连接的渠道,采用 Leader/Follower 模式,Agent(Leader)负责与远程客户端维持长连接,而 Worker 与 Agent 之间的长连接通信通过”订阅/发布“的模式使开发非常简单,具体的实现步骤如下:

                                 +-------+
                                 | start |
                                 +---+---+
                                     |
                            +--------+---------+
                          __| port competition |__
                    win /   +------------------+  \ lose
                       /                           \
                    +---------------+     tcp conn     +-------------------+
                    | Leader(Agent) |<---------------->| Follower(Worker1) |
                    +---------------+                  +-------------------+
                        |            \ tcp conn
                        |             \
                    +--------+         +-------------------+
                    | Client |         | Follower(Worker2) |
                    +--------+         +-------------------+
  • 服务启动时,系统自动探测一个可用 clusterPort 作为建立 Agent 和 Worker 长连接的端口号,源码如下:
// egg-cluster 源码 -> detectPort函数
module.exports = (port, callback) => {
  let maxPort = port + 10;
  if (typeof callback === 'function') {
    // tryListen 用于探测 port 到 maxPort 之间的可用的端口号
    return tryListen(port, maxPort, callback);
  }
};
// listen 函数 使用 net 模块建立连接来测试端口可用性,
// net.Server().listen(port) 函数在 port=0 可以随便分配一个可用的端口
function listen(port, hostname, callback) {
  const server = new net.Server();
  server.on('error', err => {
    server.close();
    // 忽略 "ENOTFOUND" 报错,表示端口仍然可用
    if (err.code === 'ENOTFOUND') {
      return callback(null, port);
    }
    return callback(err);
  });
  server.listen(port, hostname, () => {
    port = server.address().port;
    server.close();
    return callback(null, port);
  });
}

// tryListen 函数会调用 listen 函数,
//拿到可用端口后检查该端口对应 hostname 为 '0.0.0.0','localhost',ip 时是否都可以使用,否则调用 handleError 继续进行探测
function tryListen(port, maxPort, callback) {
  function handleError() {
    tryListen(port, maxPort, callback);
  }

  listen(port, null, (err, realPort) => {
    listen(port, '0.0.0.0', err => {
      if (err) {
        return handleError(err);
      }
      listen(port, 'localhost', err => {
        if (err && err.code !== 'EADDRNOTAVAIL') {
          return handleError(err);
        }
        listen(port, address.ip(), (err, realPort) => {
          if (err) {
            return handleError(err);
          }
          callback(null, realPort);
        });
      });
    });
  });
}
  • 将 clusterPort 分配给 Agent 进程(Leader),Agent 进程通过 cluster-client 模块初始化 cluster 属性
  • Worker 进程也分别根据 clusterPort 通过 cluster-client 模块初始化 cluster 属性(Follower)

上述两步的代码不是写在 egg-cluster 模块里的,而是在 EggApplication 类的定义中实现的,这里和上面讲到的 Messenger 类的实现一样有点绕,因为同样的逻辑写在了两个地方,源码如下:

// egg-core源码 -> Agent/Worker 的 cluster 属性初始化
const cluster = require('cluster-client');
class EggApplication extends EggCore {
  constructor(options) {
    this.cluster = (clientClass, options) => {
      options = Object.assign({}, this.config.clusterClient, options, {
        port: this.options.clusterPort,  //将在上一步 detectPort 中获取到的 clusterPort 传入
        isLeader: this.type === 'agent'  //指定 Agent 进程为 leader, Worker 进程为 follower
      });
      const client = cluster(clientClass, options);
      this._patchClusterClient(client);
      return client;
    };
  }
}

  • 现在 Agent 和 Worker 实例上都有了 cluster 属性,然后业务开发者就可以按照一定的约定规范实现 Agent 和 Worker 之间的长连接通信了,关于 cluster-client 如何实现 Leader/Follower 长连接模式以及如何在业务开发中使用这里就不详细介绍了,可以看相关文档:cluster-client 源码Egg 官方文档-多进程研发模式增强
步骤3:启动 Agent 子进程

在拿到 clusterPort 后,egg-cluster 立即调用 forkAgentWorker 函数启动 Agent 子进程:

// egg-cluster 源码 -> forkAgentWorker
forkAgentWorker() {
    // Agent 进程参数初始化
    const args = [ JSON.stringify(this.options) ];
    //通过 childprocess 模块的 fork 函数创建 Agent 子进程
    const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
    this.workerManager.setAgent(agentWorker); //注册到 workerManager 中
    //监听 Agent 子进程消息,并通过 this.messenger.send 进行转发
    agentWorker.on('message', msg => {
      if (typeof msg === 'string') msg = { action: msg, data: msg };
      msg.from = 'agent';
      this.messenger.send(msg);
    });
    //前面我们讲过服务中的 Agent 进程必须处于存活状态,所以当Agent 监听到异常消息进行日志记录,但不退出
    agentWorker.on('error', err => {
      err.name = 'AgentWorkerError';
      err.id = agentWorker.id;
      err.pid = agentWorker.pid;
      this.logger.error(err);
    });
    //异常情况下退出时,通知 Master 进程
    agentWorker.once('exit', (code, signal) => {
      this.messenger.send({
        action: 'agent-exit',
        data: { code, signal },
        to: 'master',
        from: 'agent',
      });
    });
  }

forkAgentWorker 函数只是在 Master 进程中 fork 出子进程,而子进程真正的运行及初始化工作是在 agent_worker.js 中实现的,这里使用了 graceful-process 这个模块优雅的退出进程,需要等待所有已有连接的关闭后再退出进程,但不会接收新的连接,同样 app_worker.js 中也会使用该函数。新建子进程代码如下:

// egg-cluster -> agent_worker.js 新建 agent 子进程
const gracefulExit = require('graceful-process');
const options = JSON.parse(process.argv[2]);
//根据 Egg 框架 export 出的 Agent 类,新建 Agent 实例
const Agent = require(options.framework).Agent;
const agent = new Agent(options);

agent.ready(err => {
  if (err) return;
  agent.removeListener('error', startErrorHandler); 
  //告诉主进程 agent 进程启动完毕,主进程收到消息后,紧接着步骤4:启动 Worker 子进程
  process.send({ action: 'agent-start', to: 'master' });
});
//设置优雅的退出进程方式
gracefulExit({
  logger: consoleLogger,
  label: 'agent_worker',
  beforeExit: () => agent.close(),
});
步骤4:启动 Worker 子进程

Worker 的启动基本与 Agent 的启动流程一致,主要有以下两点区别:

  • 启动方式不一样:Agent 使用 child_process.fork 启动,而 Worker 使用 cluster.fork 启动
  • Worker 需要对外提供 http 服务,而 Agent 不需要

需要注意的是在创建子进程的时候使用 cfork 这个模块,它是对 cluster.fork 的一个封装,用于管理创建多个子进程以及退出时重新创建子进程的工作,forkAppWorkers源码如下:

// egg-cluser 源码 -> forkAppWorkers 函数
const cfork = require('cfork');
forkAppWorkers() {
    const args = [ JSON.stringify(this.options) ];
    //通过 cfork 函数创建多个 cluster
    cfork({
      exec: this.getAppWorkerFile(), // app_worker.js
      args,
      count: this.options.workers,  //子进程个数
      refork: this.isProduction,   //退出时是否 refork
    });
    //监听 fork 事件
    cluster.on('fork', worker => {
      this.workerManager.setWorker(worker); //将子进程注册到 workerManager 中
      worker.on('message', msg => {       //通知 Master 主进程某个子进程已经创建好了
        if (typeof msg === 'string') msg = { action: msg, data: msg };
        msg.from = 'app';
        this.messenger.send(msg);
      });
    });
  }

接下来我们看一下 app_worker.js 中是如何初始化 worker 实例以及启动 http 服务的,这里需要区别 sticky 模式和非 sticky 模式:

  • sticky 模式:Master 负责统一监听对外端口,然后根据用户 ip 转发到固定的 Worker 子进程上,每个 Worker 自己启动了一个新的本地服务
  • 非 sticky 模式:每个Worker 都统一启动服务监听外部端口
// egg-cluster 源码 -> app_worker.js 新建 worker 子进程
//新建 Egg.Application 实例
const options = JSON.parse(process.argv[2]);
const Application = require(options.framework).Application;
const app = new Application(options);

//获取服务对外端口,并告知 Master
process.send({ to: 'master', action: 'realport', data: port });
//等待一切 ready 好以后启动服务
app.ready(startServer);

// 启动 http 服务
function startServer(err) {
  let server;
  // 针对 https 和 http 的不同进行处理
  if (options.https) {
    server = require('https').createServer(httpsOptions, app.callback());
  } else {
    server = require('http').createServer(app.callback());
  }

  if (options.sticky) {
    // sticky 模式下,每个 Worker 都会随机选择一个端口启动服务,这个服务用于接收 Master 服务转发过来的 connection 并进行处理
    server.listen(0, '127.0.0.1');
    // Master 服务只会发送sticky-session:connection消息
    process.on('message', (message, connection) => {
      if (message !== 'sticky-session:connection') {
        return;
      }
      server.emit('connection', connection);
      connection.resume();
    });
  } else {
    //非 sticky 情况下监听对外端口,启动服务
    if (listenConfig.path) {
      server.listen(listenConfig.path);
    } else {
      const args = [ port ];
      if (listenConfig.hostname) args.push(listenConfig.hostname);
      server.listen(...args);
    }
  }
}
//设置优雅的退出进程方式
gracefulExit({
  logger: consoleLogger,
  label: 'app_worker',
  beforeExit: () => app.close(),
});

在 Worker 启动后会发送 app-start 事件给 Master ,此时 Master 会执行 onAppStart 函数,这个函数除了对启动后的服务做一些初始化工作以外,最主要的一件事情就是在 sticky 模式下启动 MasterSocketServer 用于转发请求到各个 Worker,源码如下:

// egg-cluster 源码 -> onAppStart 函数和 startMasterSocketServer 函数
onAppStart(data) {
    //告诉 Agent 目前启动的所有 WorkerIds
    this.messenger.send({
      action: 'egg-pids',
      to: 'agent',
      data: this.workerManager.getListeningWorkerIds(),
    });
    //这里如果是 sticky 模式那么会先创建一个 MasterSocketServer,然后才正式完成 Master 进程的启动,
    if (this.options.sticky) {
      this.startMasterSocketServer(err => {
        if (err) return this.ready(err);
        this.ready(true);
      });
    } else {
      this.ready(true);
    }
}

startMasterSocketServer(cb) {
    // Master 进程创建一个 TCP 服务并监听真正的对外端口
    //这里的 pauseOnConnect=true 表示不会消费和使用传入的 connection,因为这个 connection 只有真正的传递给某个 Worker 时才会被解析使用
    require('net').createServer({ pauseOnConnect: true }, connection => {
      //开发者必须在 Nginx 配置了 remoteAddress 相关信息才能根据用户 ip 实现 sticky 模式.
      if (!connection.remoteAddress) {
        connection.close();
      } else {
        //根据用户 ip 解析到指定的 Worker 实例,并将这个 connection 转发给这个 Worker 子进程,Worker 负责真正处理请求
        const worker = this.stickyWorker(connection.remoteAddress);
        worker.send('sticky-session:connection', connection);
      }
    }).listen(this[REALPORT], cb);
}
步骤5:启动完毕,实时监测各个进程服务状态

在启动完毕后,为了保证服务正常稳定运行,Master 主进程需要监听各种异常事件:

  • agent-exit 事件:

Agent 子进程的退出事件,对应的处理函数是onAgentExit,Agent 子进程必须存活,Agent 子进程一旦退出,意味着整个服务就需要退出或者需要重启 Agent 子进程,退出时需要 removeAllListeners 防止内存泄漏。

  • app-exit 事件:

Worker 子进程的退出事件,对应的处理函数是onAppExit,清理 Master.manager 中对于该进程的注册消息,并 removeAllListeners,并根据不同的启动环境以及参数判断是否退出服务,生产环境会继续 fork 出新的 Worker 子进程而不退出服务

  • SIGINT/SIGQUIT/SIGTERM 事件

当 Master 进程收到 Ctrl-C 或者 process.exit() 退出信号时会触发这些事件,对应的处理函数是 onSignal 函数, onSignal 函数会调用 close 函数,close 函数又会调用 _doClose 函数,真正的退出操作都是在 _doClose 中实现的:

// egg-cluster 源码 -> _doClose 实现
_doClose() {
    //退出所有的 Worker 子进程
    try {
      yield this.killAppWorkers(appTimeout);
    } catch (e) {
      this.logger.error('[master] app workers exit error: ', e);
    }
    // 退出 Agent 子进程
    try {
      yield this.killAgentWorker(agentTimeout);
    } catch (e) {
      this.logger.error('[master] agent worker exit error: ', e);
    }
}

而 killAppWorkers 函数和 killAgentWorker 函数在退出各个进程时,都会调用 terminate 函数。我们知道当 kill 掉一个进程后,依赖该进程创建的子进程将成为孤儿进程,而 terminate 函数就是负责包括进程本身以及子进程的退出工作:


// egg-cluster 源码 -> terminate 函数实现
module.exports = function* (subProcess, timeout) {
  const pid = subProcess.process ? subProcess.process.pid : subProcess.pid;
  const childPids = yield getChildPids(pid); //获取所有子进程的 pid
  yield [
    killProcess(subProcess, timeout),  // kill 当前进程
    killChildren(childPids, timeout),  // kill 所有子进程
  ];
};

//如果 SIGTERM 信号不能工作,则使用 SIGKILL 信号进行 kill
function* killProcess(subProcess, timeout) {
  subProcess.kill('SIGTERM');
  yield Promise.race([
    awaitEvent(subProcess, 'exit'),
    sleep(timeout),
  ]);
  if (subProcess.killed) return;
  (subProcess.process || subProcess).kill('SIGKILL');
}

//删除所有对应的子进程
function* killChildren(children, timeout) {
  if (!children.length) return;
  kill(children, 'SIGTERM');
  const start = Date.now();
  const checkInterval = 400;
  let unterminated = [];
  while (Date.now() - start < timeout - checkInterval) {
    yield sleep(checkInterval);
    unterminated = getUnterminatedProcesses(children);
    if (!unterminated.length) return;
  }
  kill(unterminated, 'SIGKILL');
}

参考文章

 类似资料: