pomelo源码分析--connector协议处理message

公子昂
2023-12-01

作者:shihuaping0918@163.com,转载请注明作者

pomelo框架核心提供了sioconnector,udpconnector,hybirdconnector,mqttconnector。sioconnector基于socket.io,使用json通信,pc端通信。hybirdconnector基于tcp和websocket,使用二进制通信,主要用于手机端通信。mqttconnector使用mqtt协议通信,mqtt是二进制协议,是物联网协议,这个就是用于嵌入式设备通信。而udpconnector,这个看名字也知道是基于udp的,它也是使用二进制协议进行通信。这个主要用于网络环境不好,数据包小的场景。

connector按照约定是要提供encode/decode的。sioconnector的encode/decode最简单。因为它是处理json的。在connector提供encode/decode之外,还可以单独设自定义的encode/decode。先看sioconnector,因为它比较简单。

从decode看起,decode就是json解析。

/**
 * Decode client message package.
 *
 * Package format:
 *   message id: 4bytes big-endian integer
 *   route length: 1byte
 *   route: route length bytes
 *   body: the rest bytes
 *
 * @param  {String} data socket.io package from client
 * @return {Object}      message object
 */
Connector.decode = Connector.prototype.decode = function(msg) {
  var index = 0;

//package ID
  var id = parseIntField(msg, index, PKG_ID_BYTES);
  index += PKG_ID_BYTES;
//route体长
  var routeLen = parseIntField(msg, index, PKG_ROUTE_LENGTH_BYTES);
//route字符串
  var route = msg.substr(PKG_HEAD_BYTES, routeLen);
  var body = msg.substr(PKG_HEAD_BYTES + routeLen);

  return {
    id: id,  
    route: route,
    body: JSON.parse(body) //json包体
  };
};
//取长度
var parseIntField = function(str, offset, len) {
  var res = 0;
  for(var i=0; i<len; i++) {  //big-endian,网络字节序,高位在前
    if(i > 0) {
      res <<= 8;
    }
    res |= str.charCodeAt(offset + i) & 0xff;
  }

  return res;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
从decode可以看出来,消息格式是有一个package id,一个route,然后就是消息体。消息体是json。而encode稍微复杂一点。

Connector.encode = Connector.prototype.encode = function(reqId, route, msg) {
  if(reqId) { //有reqId,这个序号是客户端编的
    return composeResponse(reqId, route, msg);
  } else { //没有就是广播
    return composePush(route, msg);
  }
};

//注意这个地方,route被忽略了
var composeResponse = function(msgId, route, msgBody) {
  return {
    id: msgId, //reqId,请求包序号
    body: msgBody // 回复消息体
  };
};

var composePush = function(route, msgBody) {
  return JSON.stringify({route: route, body: msgBody});
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sioconnector.js的协议处理是非常简单的。字段也很少,但是body里面可能就千变万化了,这个是业务相关的。相信写过稍大一点项目的都很清楚,有的模块甚至有几百个命令,几百个命令就会产生几百种body。

下面再分析一下hybirdconnector.js。到了这里就要正式讲一下pomelo的消息格式了,pomelo的消息分为两层,package和message。 以下引用原文:“pomelo的二进制协议包含两层编码:package和message。message层主要实现route压缩和protobuf压缩,message层的编码结果将传递给package层。package层主要实现pomelo应用基于二进制协议的握手过程,心跳和数据传输编码,package层的编码结果可以通过tcp,websocket等协议以二进制数据的形式进行传输。message层编码可选,也可替换成其他二进制编码格式,都不影响package层编码和发送。”

package格式

package分为header和body两部分。header描述package包的类型和包的长度,body则是需要传输的数据内容。具体格式如下:

type - package类型,1个byte,取值如下。
0x01: 客户端到服务器的握手请求以及服务器到客户端的握手响应
0x02: 客户端到服务器的握手ack
0x03: 心跳包
0x04: 数据包
0x05: 服务器主动断开连接通知
length - body内容长度,3个byte的大端整数,因此最大的包长度为2^24个byte。
body - 二进制的传输内容。

message协议的主要作用是封装消息头,包括route和消息类型两部分,不同的消息类型有着不同的消息头,在消息头里面可能要打入message id(即requestId)和route信息。由于可能会有route压缩,而且对于服务端push的消息,message id为空,对于客户端请求的响应,route为空,因此message的头格式比较复杂。
消息头分为三部分,flag,message id,route。
pomelo消息头是可变的,会根据具体的消息类型和内容而改变。其中:
flag位是必须的,占用一个byte,它决定了后面的消息类型和内容的格式;
message id和route则是可选的。其中message id采用[varints 128变长编码](https://developers.google.com/protocol-buffers/docs/encoding#varints)方式,根据值的大小,长度在0~5byte之间。route则根据消息类型以及内容的大小,长度在0~255byte之间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
从这段文字的描述可以看出来,我们刚才对sioconnector.js中encode和decode的分析都是基于message的,package部分的没有涉及到。

本篇暂时不讲package部分,聚集点在于message部分。因为一发散的话,就没有重点了。

hybirdconnector.js对于encode和decode的处理是,写了一个coder.js作为抽象。

var coder = require('./common/coder');

Connector.decode = Connector.prototype.decode = coder.decode;

Connector.encode = Connector.prototype.encode = coder.encode;
1
2
3
4
5
可以看到encode和decode独立出去了,做了一个单独的抽象,这样提高了复用性和扩展性。

coder.js

//这是pomelo的另一个开源组件
var Message = require('pomelo-protocol').Message;
var Constants = require('../../util/constants');
//pomelo-logger也是另一个组件,不在核心模块里
var logger = require('pomelo-logger').getLogger('pomelo', __filename);

//encode函数
var encode = function(reqId, route, msg) {
  if(!!reqId) {
    return composeResponse(this, reqId, route, msg);
  } else {
    return composePush(this, route, msg);
  }
};

//decode函数
var decode = function(msg) {
  msg = Message.decode(msg.body);
  var route = msg.route;

  // decode use dictionary
  if(!!msg.compressRoute) {
    if(!!this.connector.useDict) {
      var abbrs = this.dictionary.getAbbrs();
      if(!abbrs[route]) {
        logger.error('dictionary error! no abbrs for route : %s', route);
        return null;
      }
      route = msg.route = abbrs[route];
    } else {
      logger.error('fail to uncompress route code for msg: %j, server not enable dictionary.', msg);
      return null;
    }
  }

  // decode use protobuf,protobuf协议解码
  if(!!this.protobuf && !!this.protobuf.getProtos().client[route]) {
    msg.body = this.protobuf.decode(route, msg.body);
  } else if(!!this.decodeIO_protobuf && !!this.decodeIO_protobuf.check(Constants.RESERVED.CLIENT, route)) {
    msg.body = this.decodeIO_protobuf.decode(route, msg.body);
  } else {
    try {
      msg.body = JSON.parse(msg.body.toString('utf8'));
    } catch (ex) {
      msg.body = {};
    }
  }

  return msg;
};

var composeResponse = function(server, msgId, route, msgBody) {
  if(!msgId || !route || !msgBody) {
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  return Message.encode(msgId, Message.TYPE_RESPONSE, 0, null, msgBody);
};

var composePush = function(server, route, msgBody) {
  if(!route || !msgBody){
    return null;
  }
  msgBody = encodeBody(server, route, msgBody);
  // encode use dictionary
  var compressRoute = 0;
  if(!!server.dictionary) {
    var dict = server.dictionary.getDict();
    if(!!server.connector.useDict && !!dict[route]) {
      route = dict[route];
      compressRoute = 1;
    }
  }
  return Message.encode(0, Message.TYPE_PUSH, compressRoute, route, msgBody);
};

var encodeBody = function(server, route, msgBody) {
    // encode use protobuf
  if(!!server.protobuf && !!server.protobuf.getProtos().server[route]) {
    msgBody = server.protobuf.encode(route, msgBody);
  } else if(!!server.decodeIO_protobuf && !!server.decodeIO_protobuf.check(Constants.RESERVED.SERVER, route)) {
     msgBody = server.decodeIO_protobuf.encode(route, msgBody);
  } else { //兼容json
    msgBody = new Buffer(JSON.stringify(msgBody), 'utf8');
  }
  return msgBody;
};

module.exports = {
  encode: encode,
  decode: decode
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
对于coder.js中的encode和decode,里面调用的函数名和sioconnector.js中都是一致的。所不同的是对于body的处理,json的话直接用JSON相关的函数就可以了。从coder.js文件来看,所谓的二进制实际上是用的protobuf,不支持其它的二进制协议。代码是比较清晰的,就不再对代码做太多解释了。

最后补充说明,协议这种东西,最好不要自定义二进制协议,更不要自定义类似query string那种文本协议。自定义二进制协议一是调试非常的麻烦,二是要做协议转换的时候,开发速度慢,出错率高,工作量大,自定义二进制协议少有能直接DSL生成转换代码的。最好的方案目前看到的也是用lua去映射,然后写一段通用代码去转换。而类query string的文本协议就更痛苦了,长的就是像这样子a=b&c=d。这种协议第一,要做编码转换,特殊字符转换。二,这种表示是一维的,key-value形式,也就是一个map转成了数组。它的扩展性非常地差,嵌套表达能力基本为0,因为嵌套表达就需要新增分隔符,多层嵌套以后,调协议会成为开发之间的导火索。同时作为文本协议,它的体积很大,无法压缩,也不能直观地格式化。非要用文本协议,直接用json就好了。
--------------------- 
作者:心中那自由的世界 
来源:CSDN 
原文:https://blog.csdn.net/119365374/article/details/77609102 
版权声明:本文为博主原创文章,转载请附上博文链接!

 类似资料: