当前位置: 首页 > 工具软件 > nebula-syntax > 使用案例 >

高性能网络框架Nebula——protobuf协议设计1.0——TCP通讯协议设计

窦伟
2023-12-01

TCP通讯协议设计

1、背景应用

Protobuf被用于生产环境的即时通讯、埋点数据采集、消息推送、redis和mysql数据代理等。

2、TCP通讯协议设计

2.1、protobuf3版Msg(messege文件)

syntax = "proto3";
// import "google/protobuf/any.proto";

/**
 * @brief 消息头
 * @note MsgHead为固定15字节的头部,当MsgHead不等于15字节时,消息发送将出错。
 *       在proto2版本,MsgHead为15字节总是成立,cmd、seq、len都是required;
 *       但proto3版本,MsgHead为15字节则必须要求cmd、seq、len均不等于0,否则无法正确进行收发编解码。
 */
message MsgHead
{
    fixed32 cmd                = 1;           ///< 命令字(压缩加密算法占高位1字节)
    fixed32 seq                = 2;           ///< 序列号
    sfixed32 len               = 3;           ///< 消息体长度
}

/**
 * @brief 消息体
 * @note 消息体主体是data,所有业务逻辑内容均放在data里。req_target是请求目标,用于
 * 服务端接入路由,请求包必须填充。rsp_result是响应结果,响应包必须填充。
 */
message MsgBody
{
    oneof msg_type
    {
	    Request req_target               = 1;			///< 请求目标(请求包必须填充)
	    Response rsp_result              = 2;			///< 响应结果(响应包必须填充)
    }
    bytes data                  = 3;			///< 消息体主体
    bytes add_on                = 4;			///< 服务端接入层附加在请求包的数据(客户端无须理会)
    string trace_id             = 5;            ///< for log trace
    // google.protobuf.Any data             = 3;
    // google.protobuf.Any add_on           = 4;

    message Request

    {
    	uint32 route_id         = 1;		    ///< 路由ID
        string route            = 2;			///< 路由ID(当route_id用整型无法表达时使用)
    }

    message Response
    {
        int32 code            = 1;           ///< 错误码
        bytes msg             = 2;           ///< 错误信息
    }
}

MsgBody的data字段存储消息主体,任何自定义数据均可以二进制数据流方式写入到data。

  • msg_type用于标识该消息是请求还是响应(所有网络数据流都可归为请求或响应),如果是请求,则可以选择性填充Request里的route_idroute,如果填充了,则框架层无须解析应用层协议(也无法解析)就能自动根据路由ID转发,而无须应用层解开data里的内容再根据自定义逻辑转发。如果是响应,则定义了统一的错误标准,也为业务无关的错误处理提供方便。

  • add_on是附在长连接上的业务数据,框架并不会解析但会在每次转发消息时带上,可以为应用提供极其方便且强大的功能。比如,IM用户登录时客户端只发送用户ID和密码到服务端,服务端在登录校验通过后,将该用户的昵称、头像等信息通过框架提供的方法SetClientData()将数据附在服务端接入层该用户对应的长连接Channel上,之后所有从该连接过来的请求都会由框架层自动填充add_on字段,服务端的其他逻辑服务器只从data中得到自定义业务逻辑(比如聊天消息)数据,却可以从add_on中得到这个发送用户的信息。add_on的设计简化了应用开发逻辑,并降低了客户端与服务端传输的数据量。

  • trace_id用于分布式日志跟踪。分布式服务的错误定位是相当麻烦的,Nebula分布式服务解决方案提供了日志跟踪功能,协议里的trace_id字段的设计使得Nebula框架可以在完全不增加应用开发者额外工作的情况下(正常调用LOG4_INFO写日志而无须额外工作)实现所有标记着同一trace_id的日志发送到指定一台日志服务器,定义错误时跟单体服务那样登录一台服务器查看日志即可。比如,IM用户发送一条消息失败,在用户发送的消息到达服务端接入层时就被打上了trace_id标记,这个id会一直传递到逻辑层、存储层等,哪个环节发生了错误都可以从消息的发送、转发、处理路径上查到。

2.1、MsgHead和MsgBody的编解码

namespace neb
{
CodecProto::CodecProto(std::shared_ptr<NetLogger> pLogger, E_CODEC_TYPE eCodecType)
    : Codec(pLogger, eCodecType)
{}//构造函数
CodecProto::~CodecProto()
{}//析构函数

//编码环节
E_CODEC_STATUS CodecProto::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, CBuffer* pBuff)//编码, pBuff  数据缓冲区
{
    LOG4_TRACE("pBuff->ReadableBytes()=%u, oMsgHead.ByteSize() = %d", pBuff->ReadableBytes(), oMsgHead.ByteSize());//日志记录
    int iHadWriteLen = 0;//已经编码字节数:应该是记录头部和包体总的字节数,oMsgHead+oMsgBody
    int iWriteLen = 0;//正在编码字节数
    int iNeedWriteLen = gc_uiMsgHeadSize;//需要编码字节总数,gc_uiMsgHeadSize这个是不是头部之前的什么字节数
    std::string strTmpData;//创建一个输出流
    oMsgHead.SerializeToString(&strTmpData);//oMsgHead、头部数据序列化
    iWriteLen = pBuff->Write(strTmpData.c_str(), gc_uiMsgHeadSize);//Write返回写入到strTmpData流内的字节数
    if (iWriteLen != iNeedWriteLen)//判断写入字节是不是和需要编码字节总数相同,不相同
    {
        LOG4_ERROR("buff write head iWriteLen != iNeedWriteLen!");
        pBuff->SetWriteIndex(pBuff->GetWriteIndex() - iHadWriteLen);//重置写入
        return(CODEC_STATUS_ERR);//编解码失败
    }
    iHadWriteLen += iWriteLen;//相同,已经编码字节数加上头部oMsgHead
    if (oMsgHead.len() <= 0)    // 无包体(心跳包等),nebula在proto3的使用上以-1表示包体长度为0
    {
        return(CODEC_STATUS_OK);///< 编解码成功
    }
    iNeedWriteLen = oMsgBody.ByteSize();//记录包体需要编码字节数
    oMsgBody.SerializeToString(&strTmpData);//序列化
    iWriteLen = pBuff->Write(strTmpData.c_str(), oMsgBody.ByteSize());//Write返回写入到strTmpData流内的字节数
    if (iWriteLen == iNeedWriteLen)
    {
        return(CODEC_STATUS_OK);///< 编解码成功
    }
    else
    {
        LOG4_ERROR("buff write body iWriteLen != iNeedWriteLen!");
        pBuff->SetWriteIndex(pBuff->GetWriteIndex() - iHadWriteLen);
        return(CODEC_STATUS_ERR);
    }
}

//解码环节
E_CODEC_STATUS CodecProto::Decode(CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
    LOG4_TRACE("pBuff->ReadableBytes()=%d, pBuff->GetReadIndex()=%d",
                    pBuff->ReadableBytes(), pBuff->GetReadIndex());//日志记录
    if (pBuff->ReadableBytes() >= gc_uiMsgHeadSize)//应该是可读字节>=gc_uiMsgHeadSize
    {
        bool bResult = oMsgHead.ParseFromArray(pBuff->GetRawReadBuffer(), gc_uiMsgHeadSize);//oMsgHead反序列化
        if (bResult)
        {
           LOG4_TRACE("pBuff->ReadableBytes()=%d, oMsgHead.len()=%d",
                            pBuff->ReadableBytes(), oMsgHead.len());
            if (oMsgHead.len() <= 0)      // 无包体(心跳包等),nebula在proto3的使用上以-1表示包体长度为0
            {
                pBuff->SkipBytes(gc_uiMsgHeadSize);//跳过gc_uiMsgHeadSize字节数
                return(CODEC_STATUS_OK);///< 编解码成功
            }
            if (pBuff->ReadableBytes() >= gc_uiMsgHeadSize + oMsgHead.len())
            {
                bResult = oMsgBody.ParseFromArray(//oMsgBody反序列化
                                pBuff->GetRawReadBuffer() + gc_uiMsgHeadSize, oMsgHead.len());
                LOG4_TRACE("pBuff->ReadableBytes()=%d, oMsgBody.ByteSize()=%d", pBuff->ReadableBytes(), oMsgBody.ByteSize());
                if (bResult)
                {
                   pBuff->SkipBytes(gc_uiMsgHeadSize + oMsgHead.len());
                   return(CODEC_STATUS_OK);///< 编解码成功
                }
                else
                {
                    LOG4_WARNING("cmd[%u], seq[%u] oMsgBody.ParseFromArray() error!", oMsgHead.cmd(), oMsgHead.seq());
                    return(CODEC_STATUS_ERR);///< 编解码失败
                }
            }
            else
            {
               return(CODEC_STATUS_PAUSE);///< 编解码暂停(数据不完整,等待数据完整之后再解码)
            }
        }
        else
        {
            LOG4_WARNING("oMsgHead.ParseFromArray() error!");   // maybe port scan from operation and maintenance system.
            return(CODEC_STATUS_ERR);
        }
    }
    else
    {
        return(CODEC_STATUS_PAUSE);///< 编解码暂停(数据不完整,等待数据完整之后再解码)
    }
}

} /* namespace neb */

参考

1、高性能C++ IoC网络框架Nebula
2、https://my.oschina.net/cqcbw/blog/3048689
3、https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp

 类似资料: