Protobuf被用于生产环境的即时通讯、埋点数据采集、消息推送、redis和mysql数据代理等。
syntax = "proto3";
// import "google/protobuf/any.proto";
/**
* @brief 消息头
* @note MsgHead为固定15字节的头部,当MsgHead不等于15字节时,消息发送将出错。
* 在proto2版本,MsgHead为15字节总是成立,cmd、seq、len都是required;
* 但proto3版本,MsgHead为15字节则必须要求cmd、seq、len均不等于0,否则无法正确进行收发编解码。
*/
message MsgHead
{
fixed32 cmd = 1; ///< 命令字(压缩加密算法占高位1字节)
fixed32 seq = 2; ///< 序列号
sfixed32 len = 3; ///< 消息体长度
}
/**
* @brief 消息体
* @note 消息体主体是data,所有业务逻辑内容均放在data里。req_target是请求目标,用于
* 服务端接入路由,请求包必须填充。rsp_result是响应结果,响应包必须填充。
*/
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 请求目标(请求包必须填充)
Response rsp_result = 2; ///< 响应结果(响应包必须填充)
}
bytes data = 3; ///< 消息体主体
bytes add_on = 4; ///< 服务端接入层附加在请求包的数据(客户端无须理会)
string trace_id = 5; ///< for log trace
// google.protobuf.Any data = 3;
// google.protobuf.Any add_on = 4;
message Request
{
uint32 route_id = 1; ///< 路由ID
string route = 2; ///< 路由ID(当route_id用整型无法表达时使用)
}
message Response
{
int32 code = 1; ///< 错误码
bytes msg = 2; ///< 错误信息
}
}
MsgBody的data字段存储消息主体,任何自定义数据均可以二进制数据流方式写入到data。
msg_type
用于标识该消息是请求还是响应(所有网络数据流都可归为请求或响应),如果是请求,则可以选择性填充Request
里的route_id
或route
,如果填充了,则框架层无须解析应用层协议(也无法解析)就能自动根据路由ID
转发,而无须应用层解开data
里的内容再根据自定义逻辑转发。如果是响应,则定义了统一的错误标准,也为业务无关的错误处理提供方便。
add_on
是附在长连接上的业务数据,框架并不会解析但会在每次转发消息时带上,可以为应用提供极其方便且强大的功能。比如,IM用户登录时客户端只发送用户ID和密码到服务端,服务端在登录校验通过后,将该用户的昵称、头像等信息通过框架提供的方法SetClientData()
将数据附在服务端接入层该用户对应的长连接Channel
上,之后所有从该连接过来的请求都会由框架层自动填充add_on
字段,服务端的其他逻辑服务器只从data
中得到自定义业务逻辑(比如聊天消息)数据,却可以从add_on
中得到这个发送用户的信息。add_on
的设计简化了应用开发逻辑,并降低了客户端与服务端传输的数据量。
trace_id
用于分布式日志跟踪。分布式服务的错误定位是相当麻烦的,Nebula分布式服务解决方案提供了日志跟踪功能,协议里的trace_id
字段的设计使得Nebula框架可以在完全不增加应用开发者额外工作的情况下(正常调用LOG4_INFO
写日志而无须额外工作)实现所有标记着同一trace_id
的日志发送到指定一台日志服务器,定义错误时跟单体服务那样登录一台服务器查看日志即可。比如,IM用户发送一条消息失败,在用户发送的消息到达服务端接入层时就被打上了trace_id
标记,这个id会一直传递到逻辑层、存储层等,哪个环节发生了错误都可以从消息的发送、转发、处理路径上查到。
namespace neb
{
CodecProto::CodecProto(std::shared_ptr<NetLogger> pLogger, E_CODEC_TYPE eCodecType)
: Codec(pLogger, eCodecType)
{}//构造函数
CodecProto::~CodecProto()
{}//析构函数
//编码环节
E_CODEC_STATUS CodecProto::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, CBuffer* pBuff)//编码, pBuff 数据缓冲区
{
LOG4_TRACE("pBuff->ReadableBytes()=%u, oMsgHead.ByteSize() = %d", pBuff->ReadableBytes(), oMsgHead.ByteSize());//日志记录
int iHadWriteLen = 0;//已经编码字节数:应该是记录头部和包体总的字节数,oMsgHead+oMsgBody
int iWriteLen = 0;//正在编码字节数
int iNeedWriteLen = gc_uiMsgHeadSize;//需要编码字节总数,gc_uiMsgHeadSize这个是不是头部之前的什么字节数
std::string strTmpData;//创建一个输出流
oMsgHead.SerializeToString(&strTmpData);//oMsgHead、头部数据序列化
iWriteLen = pBuff->Write(strTmpData.c_str(), gc_uiMsgHeadSize);//Write返回写入到strTmpData流内的字节数
if (iWriteLen != iNeedWriteLen)//判断写入字节是不是和需要编码字节总数相同,不相同
{
LOG4_ERROR("buff write head iWriteLen != iNeedWriteLen!");
pBuff->SetWriteIndex(pBuff->GetWriteIndex() - iHadWriteLen);//重置写入
return(CODEC_STATUS_ERR);//编解码失败
}
iHadWriteLen += iWriteLen;//相同,已经编码字节数加上头部oMsgHead
if (oMsgHead.len() <= 0) // 无包体(心跳包等),nebula在proto3的使用上以-1表示包体长度为0
{
return(CODEC_STATUS_OK);///< 编解码成功
}
iNeedWriteLen = oMsgBody.ByteSize();//记录包体需要编码字节数
oMsgBody.SerializeToString(&strTmpData);//序列化
iWriteLen = pBuff->Write(strTmpData.c_str(), oMsgBody.ByteSize());//Write返回写入到strTmpData流内的字节数
if (iWriteLen == iNeedWriteLen)
{
return(CODEC_STATUS_OK);///< 编解码成功
}
else
{
LOG4_ERROR("buff write body iWriteLen != iNeedWriteLen!");
pBuff->SetWriteIndex(pBuff->GetWriteIndex() - iHadWriteLen);
return(CODEC_STATUS_ERR);
}
}
//解码环节
E_CODEC_STATUS CodecProto::Decode(CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("pBuff->ReadableBytes()=%d, pBuff->GetReadIndex()=%d",
pBuff->ReadableBytes(), pBuff->GetReadIndex());//日志记录
if (pBuff->ReadableBytes() >= gc_uiMsgHeadSize)//应该是可读字节>=gc_uiMsgHeadSize
{
bool bResult = oMsgHead.ParseFromArray(pBuff->GetRawReadBuffer(), gc_uiMsgHeadSize);//oMsgHead反序列化
if (bResult)
{
LOG4_TRACE("pBuff->ReadableBytes()=%d, oMsgHead.len()=%d",
pBuff->ReadableBytes(), oMsgHead.len());
if (oMsgHead.len() <= 0) // 无包体(心跳包等),nebula在proto3的使用上以-1表示包体长度为0
{
pBuff->SkipBytes(gc_uiMsgHeadSize);//跳过gc_uiMsgHeadSize字节数
return(CODEC_STATUS_OK);///< 编解码成功
}
if (pBuff->ReadableBytes() >= gc_uiMsgHeadSize + oMsgHead.len())
{
bResult = oMsgBody.ParseFromArray(//oMsgBody反序列化
pBuff->GetRawReadBuffer() + gc_uiMsgHeadSize, oMsgHead.len());
LOG4_TRACE("pBuff->ReadableBytes()=%d, oMsgBody.ByteSize()=%d", pBuff->ReadableBytes(), oMsgBody.ByteSize());
if (bResult)
{
pBuff->SkipBytes(gc_uiMsgHeadSize + oMsgHead.len());
return(CODEC_STATUS_OK);///< 编解码成功
}
else
{
LOG4_WARNING("cmd[%u], seq[%u] oMsgBody.ParseFromArray() error!", oMsgHead.cmd(), oMsgHead.seq());
return(CODEC_STATUS_ERR);///< 编解码失败
}
}
else
{
return(CODEC_STATUS_PAUSE);///< 编解码暂停(数据不完整,等待数据完整之后再解码)
}
}
else
{
LOG4_WARNING("oMsgHead.ParseFromArray() error!"); // maybe port scan from operation and maintenance system.
return(CODEC_STATUS_ERR);
}
}
else
{
return(CODEC_STATUS_PAUSE);///< 编解码暂停(数据不完整,等待数据完整之后再解码)
}
}
} /* namespace neb */
1、高性能C++ IoC网络框架Nebula
2、https://my.oschina.net/cqcbw/blog/3048689
3、https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp