在上篇教程中,我们200行实现了一个纯C版的jsonrpc框架,使用的event模块+cJSON
实现,本篇中我们将介绍200行实现一个C++版的protorpc框架,使用evpp模块+protobuf
实现。
evpp模块是event模块的c++封装,具体介绍见evpp/README.md
protobuf是google出品的序列化/反序列化结构化数据存储格式,具体介绍可参考我的另一篇博客protobuf,也可参考protobuf官方文档
git clone https://github.com/protocolbuffers/protobuf
cd protobuf
./autogen.sh
./configure
make
sudo make install
sudo ldconfig
which protoc
protoc -h
/*
* proto rpc server
*
* @build make protorpc
* @server bin/protorpc_server 1234
* @client bin/protorpc_client 127.0.0.1 1234 add 1 2
*
*/
#include "TcpServer.h"
using namespace hv;
#include "protorpc.h"
#include "router.h"
#include "handler/handler.h"
#include "handler/calc.h"
#include "handler/login.h"
protorpc_router router[] = {
{"add", calc_add},
{"sub", calc_sub},
{"mul", calc_mul},
{"div", calc_div},
{"login", login},
};
#define PROTORPC_ROUTER_NUM (sizeof(router)/sizeof(router[0]))
class ProtoRpcServer : public TcpServer {
public:
ProtoRpcServer() : TcpServer()
{
onConnection = [](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr();
if (channel->isConnected()) {
printf("%s connected! connfd=%d\n", peeraddr.c_str(), channel->fd());
} else {
printf("%s disconnected! connfd=%d\n", peeraddr.c_str(), channel->fd());
}
};
onMessage = handleMessage;
// init protorpc_unpack_setting
unpack_setting_t protorpc_unpack_setting;
memset(&protorpc_unpack_setting, 0, sizeof(unpack_setting_t));
protorpc_unpack_setting.mode = UNPACK_BY_LENGTH_FIELD;
protorpc_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
protorpc_unpack_setting.body_offset = PROTORPC_HEAD_LENGTH;
protorpc_unpack_setting.length_field_offset = PROTORPC_HEAD_LENGTH_FIELD_OFFSET;
protorpc_unpack_setting.length_field_bytes = PROTORPC_HEAD_LENGTH_FIELD_BYTES;
protorpc_unpack_setting.length_field_coding = ENCODE_BY_BIG_ENDIAN;
setUnpack(&protorpc_unpack_setting);
}
int listen(int port) { return createsocket(port); }
private:
static void handleMessage(const SocketChannelPtr& channel, Buffer* buf) {
// unpack -> Request::ParseFromArray -> router -> Response::SerializeToArray -> pack -> Channel::write
// protorpc_unpack
protorpc_message msg;
memset(&msg, 0, sizeof(msg));
int packlen = protorpc_unpack(&msg, buf->data(), buf->size());
if (packlen < 0) {
printf("protorpc_unpack failed!\n");
return;
}
assert(packlen == buf->size());
if (protorpc_head_check(&msg.head) != 0) {
printf("protorpc_head_check failed!\n");
return;
}
// Request::ParseFromArray
protorpc::Request req;
protorpc::Response res;
if (req.ParseFromArray(msg.body, msg.head.length)) {
printf("> %s\n", req.DebugString().c_str());
res.set_id(req.id());
// router
const char* method = req.method().c_str();
bool found = false;
for (int i = 0; i < PROTORPC_ROUTER_NUM; ++i) {
if (strcmp(method, router[i].method) == 0) {
found = true;
router[i].handler(req, &res);
break;
}
}
if (!found) {
not_found(req, &res);
}
} else {
bad_request(req, &res);
}
// Response::SerializeToArray + protorpc_pack
protorpc_message_init(&msg);
msg.head.length = res.ByteSize();
packlen = protorpc_package_length(&msg.head);
unsigned char* writebuf = NULL;
HV_ALLOC(writebuf, packlen);
packlen = protorpc_pack(&msg, writebuf, packlen);
if (packlen > 0) {
printf("< %s\n", res.DebugString().c_str());
res.SerializeToArray(writebuf + PROTORPC_HEAD_LENGTH, msg.head.length);
channel->write(writebuf, packlen);
}
HV_FREE(writebuf);
}
};
int main(int argc, char** argv) {
if (argc < 2) {
printf("Usage: %s port\n", argv[0]);
return -10;
}
int port = atoi(argv[1]);
ProtoRpcServer srv;
int listenfd = srv.listen(port);
if (listenfd < 0) {
return -20;
}
printf("protorpc_server listen on port %d, listenfd=%d ...\n", port, listenfd);
srv.setThreadNum(4);
srv.start();
while (1) hv_sleep(1);
return 0;
}
流程很清晰,启动一个TcpServer
,监听指定端口,通过setUnpack
接口设置拆包规则,onMessage
回调上来就是完整的一包数据,回调里调用protorpc_unpack
拆包、Request::ParseFromArray
反序列化得到结构化的请求,通过请求里的method
字段查找注册好的router路由表
,调用对应的handler
处理请求、填充响应,然后Response::SerializeToArray
序列化响应+protorpc_pack
加上头部封包后,最后调用Channel::write
发送出去。
base.proto定义如下:
syntax = "proto3";
package protorpc;
message Error {
int32 code = 1;
string message = 2;
}
message Request {
uint64 id = 1;
string method = 2;
repeated bytes params = 3;
}
message Response {
uint64 id = 1;
optional bytes result = 2;
optional Error error = 3;
}
执行该目录下的protoc.sh
会调用protoc
根据proto
定义文件自动生成对应代码。
git clone https://github.com/ithewei/libhv
cd libhv
make protorpc
bin/protorpc_server 1234
bin/protorpc_client 127.0.0.1 1234 add 1 2
bin/protorpc_client 127.0.0.1 1234 div 1 0
bin/protorpc_client 127.0.0.1 1234 xyz 1 2
结果如下:
服务端:
$ bin/protorpc_server 1234
protorpc_server listen on port 1234, listenfd=3 ...
客户端:
$ bin/protorpc_client 127.0.0.1 1234 add 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "add"
params: "\010\001"
params: "\010\002"
calc success!
1 add 2 = 3
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 div 1 0
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "div"
params: "\010\001"
params: ""
RPC error:
code: 400
message: "Bad Request"
calc failed!
disconnected to 127.0.0.1:1234! connfd=4
$ bin/protorpc_client 127.0.0.1 1234 xyz 1 2
connected to 127.0.0.1:1234! connfd=4
id: 1
method: "login"
params: "\n\005admin\022\006123456"
login success!
user_id: 123456
token: "admin:123456"
id: 2
method: "xyz"
params: "\010\001"
params: "\010\002"
RPC error:
code: 404
message: "Not Found"
calc failed!
disconnected to 127.0.0.1:1234! connfd=4