pomelo--connector之网络监听

程和煦
2023-12-01

作者:shihuaping0918@163.com,转载请注明作者

pomelo的connector负责接收外部连接,同时做协议的编码解码,接收的时候做解码,发送的时候做编码。如果有对消息进行加密的话,也是在这里进行处理。有unicode的话,还要转码成utf8。

connector的网络处理是基于事件的,这也符合node.js的设计。connector是一个component,根据pomelo的约定,component有start/afterStart/stop等调用,进行生命周期管理。

connector依赖connection/server/pushScheduler/session组件,它是很重量级的,内部还有各种协议的实现,典型的有protobuf,mqtt。其中protobuf大家都很熟悉了,mqtt是物联网协议,它的特点是体积小,效率高,还省电。根据大黄易提供的数据,pomelo+mqtt能够实现单机30w在线的推送。这个数字就非常惊人了,因为一般的服务器设计能够承载10k级别的在线就算是很好了。

connector的体量有这么大,一篇就分析完也是不现实的。准备分成三篇来讲,第一篇讲connector的网络相关的内容。第二篇讲协议和加解密。第三篇讲connector和其它组件的交互。

前面讲到pomelo是按约定来编程的,又是微内核+插件实现方式,所以光看代码有些东西是看不出来的,还需要结合配置来看才行。如果仅仅是看代码,可能调用关系很难理得清楚,看着看着就卡住了。

按照普通网络服务器的流程,首先是要有监听,绑定,要有host和port。然后才有连接进来,连接建立了以后,才有数据收发,编码解码。先找到监听在哪里。

还是看https://github.com/NetEase/chatofpomelo/blob/master/game-server/app.js这个项目,chatofpomelo,因为它足够简单。按第三篇的分析,它配了一个connector: pomelo.connectors.sioconnector,到conpoment/connector.js里看这个配置是怎么生效的。

connector.js是对各种不同的connector的封装,它里面有一个函数getConnector,这个函数会根据配置加载真正实现业务的connector。

var getConnector = function(app, opts) {
  var connector = opts.connector;  //配了
  if (!connector) { //有值,不进下面的函数
    return getDefaultConnector(app, opts);
  }
  //如果不是函数,也不进下面的行
  if (typeof connector !== 'function') {
    return connector;
  }
 //调用函数
  var curServer = app.getCurServer();
  return connector(curServer.clientPort, curServer.host, opts);
};
1
2
3
4
5
6
7
8
9
10
11
12
13
pomelo.connections.sioconnector实际上是一个函数,看export的内容

/**
 * Connector that manager low level connection and protocol bewteen server and client.
 * Develper can provide their own connector to switch the low level prototol, such as tcp or probuf.
 */
var Connector = function(port, host, opts) {
  if (!(this instanceof Connector)) {
    return new Connector(port, host, opts);
  }

  EventEmitter.call(this);
  this.port = port;
  this.host = host;
  this.opts = opts;
  this.heartbeats = opts.heartbeats || true;
  this.closeTimeout = opts.closeTimeout || 60;
  this.heartbeatTimeout = opts.heartbeatTimeout || 60;
  this.heartbeatInterval = opts.heartbeatInterval || 25;
};

util.inherits(Connector, EventEmitter);

module.exports = Connector; //这就是函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
所以可以分析得知connector.js中的connector是根据配置去connections下面加载指定的connector。没有配置不配就加载一个默认的,这个默认的就是sioconnector。

一、网络监听: 
加载分析完以后,看一下启动和停止。

pro.afterStart = function(cb) {
  this.connector.start(cb);  //sioconnector.start启动
  this.connector.on('connection', hostFilter.bind(this, bindEvents));
};

pro.stop = function(force, cb) {
  if (this.connector) {
    this.connector.stop(force, cb); //sioconnector.stop停止
    this.connector = null;
    return;
  } else {
    process.nextTick(cb);
  }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
先到sioconnector.js文件里去看一下

/**
 * Start connector to listen the specified port
 */
Connector.prototype.start = function(cb) {
  var self = this; //注意这里,this在js中是怎么变化的
  // issue https://github.com/NetEase/pomelo-cn/issues/174
  var opts = {}
  if(!!this.opts) {
    opts = this.opts;
  }
  else {
    opts = {
      transports: [
      'websocket', 'polling-xhr', 'polling-jsonp', 'polling'
      ]
    };
  }
//使用socket.io作为网络底层库
  var sio = require('socket.io')(httpServer, opts);

  var port = this.port;
  httpServer.listen(port, function () { //看到listen了
    console.log('sio Server listening at port %d', port);
  });
  sio.set('resource', '/socket.io');
  sio.set('transports', this.opts.transports);
  sio.set('heartbeat timeout', this.heartbeatTimeout);
  sio.set('heartbeat interval', this.heartbeatInterval);
//有连接进来就触发回调
  sio.on('connection', function (socket) {
    var siosocket = new SioSocket(curId++, socket);
    self.emit('connection', siosocket);  //触发事件
    siosocket.on('closing', function(reason) {
      siosocket.send({route: 'onKick', reason: reason});
    });
  });

  process.nextTick(cb);
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
从sioconnector.js中可以看到,配置不仅是零散地配,还零散地读。这是pomelo非常不好的一个地方,没有一个集中的配置管理。listen就是网络端口监听,监听成功了,外部才能和服务器建立网络连接。

二、连接建立 
代码中已经看到了listen和connection事件,从代码看,sioconnection只关注连接的建立和关闭。数据的读取它不关心。连接建立的时候它手动触发了一个事件,叫connection,在这个事件里,把socket和连接id给传进去了。

下面去看一下,这个事件发出去以后被谁接收了,又是怎么处理的。先离开sioconnector.js回到connector.js。

pro.afterStart = function(cb) {
  this.connector.start(cb);
//就是它接收了connection事件
  this.connector.on('connection', hostFilter.bind(this, bindEvents));
};
1
2
3
4
5
代码中显示sioconnector.js发出的connection事件被connector.js所接收,而且还和一个bindEvents函数有关。

var bindEvents = function(self, socket) {
  var curServer = self.app.getCurServer();
  var maxConnections = curServer['max-connections'];
  if (self.connection && maxConnections) {
    self.connection.increaseConnectionCount();
    var statisticInfo = self.connection.getStatisticsInfo();
    if (statisticInfo.totalConnCount > maxConnections) {
      logger.warn('the server %s has reached the max connections %s', curServer.id, maxConnections);
      socket.disconnect();
      return;
    }
  }

  //create session for connection
  var session = getSession(self, socket);
  var closed = false;
  //网络断开
  socket.on('disconnect', function() {
    if (closed) {
      return;
    }
    closed = true;
    if (self.connection) {
      self.connection.decreaseConnectionCount(session.uid);
    }
  });
  //网络错误
  socket.on('error', function() {
    if (closed) {
      return;
    }
    closed = true;
    if (self.connection) {
      self.connection.decreaseConnectionCount(session.uid);
    }
  });

  //消息读取
  // new message
  socket.on('message', function(msg) {
    var dmsg = msg;
    if (self.useAsyncCoder) {
      return handleMessageAsync(self, msg, session, socket);
    }

    if (self.decode) {
      dmsg = self.decode(msg, session);
    } else if (self.connector.decode) {
      dmsg = self.connector.decode(msg, socket);
    }
    if (!dmsg) {
      // discard invalid message
      return;
    }

    // use rsa crypto
    if (self.useCrypto) {
      var verified = verifyMessage(self, session, dmsg);
      if (!verified) {
        logger.error('fail to verify the data received from client.');
        return;
      }
    }

    handleMessage(self, session, dmsg);
  }); //on message end
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
三、消息读取 
从上面的代码可以看到,connector.js中才对socket的error/message/disconnect做了处理。其中消息读取就是在socket.on('message',cb)中的回调里实现的。

消息读到以后,先进行解码——如果配了解码器的话。然后进行进行加解密操作。都正常的话,就进入后续的流程handleMessage。

到此为止,coonector的网络监听,读取,断开,错误都分析完了。至于发送就没有必要去分析了。

还留有一个小尾巴,那就是端口,端口的来源是在config/servers.json里。这是pomelo配置的一种设置,它可以在servers.json里配多个server。每个server端口不一样。

{
    "development":{
        "connector":[
             {"id":"connector-server-1", "host":"127.0.0.1", "port":4050, "clientPort": 3050, "frontend": true},
             {"id":"connector-server-2", "host":"127.0.0.1", "port":4051, "clientPort": 3051, "frontend": true},
             {"id":"connector-server-3", "host":"127.0.0.1", "port":4052, "clientPort": 3052, "frontend": true}
         ],
        "chat":[
             {"id":"chat-server-1", "host":"127.0.0.1", "port":6050},
             {"id":"chat-server-2", "host":"127.0.0.1", "port":6051},
             {"id":"chat-server-3", "host":"127.0.0.1", "port":6052}
        ],
        "gate":[
           {"id": "gate-server-1", "host": "127.0.0.1", "clientPort": 3014, "frontend": true}
        ]
    },
    "production":{
           "connector":[
             {"id":"connector-server-1", "host":"127.0.0.1", "port":4050, "clientPort": 3050, "frontend": true},
             {"id":"connector-server-2", "host":"127.0.0.1", "port":4051, "clientPort": 3051, "frontend": true},
             {"id":"connector-server-3", "host":"127.0.0.1", "port":4052, "clientPort": 3052, "frontend": true}
         ],
        "chat":[
             {"id":"chat-server-1", "host":"127.0.0.1", "port":6050},
             {"id":"chat-server-2", "host":"127.0.0.1", "port":6051},
             {"id":"chat-server-3", "host":"127.0.0.1", "port":6052}
        ],
        "gate":[
           {"id": "gate-server-1", "host": "127.0.0.1", "clientPort": 3014, "frontend": true}
        ]
  }
}
--------------------- 
作者:心中那自由的世界 
来源:CSDN 
原文:https://blog.csdn.net/119365374/article/details/77602687 
版权声明:本文为博主原创文章,转载请附上博文链接!

 类似资料: