gStore是由北京大学王选计算机所数据管理实验室(PKUMOD)邹磊教授团队研发的面向知识图谱的高效图数据库系统,用来管理庞大的关联数据(linked data)。图谱学苑weekly作为技术分享专栏在此之前发布了多期关于gStore功能特性、操作指南的系列文章。weekly将从本期开始陆续推出gStore内核源码解析系列文章,带你深入了解gStore各功能模块的源代码逻辑和实现原理。这一系列的文章主要面向gStore内核开发者及图数据库研究人员。
接下来会以从外到内的方式,由浅入深逐步解析gStore的内核源码。本篇将从SERVER服务的启动、参数解析、线程池解析、HTTP请求解析四个方面对gstore http服务的源码进行介绍。
ghttp模块实现了一个简单、快速、多线程的HTTP服务器,它用到了Ole Christian Eidheim开源的第三方库Simple-Web-Service (GitHub: https://github.com/eidheim/Simple-Web-Server),是一个基于Boost.Asio的http网络服务框架。
http服务将采用fork的方式创建子进程,在子进程中启动API响应服务,主进程作为守护进程存在,当子进程因某些异常被中断时,主进程将自动创建新的子进程,如下main函数代码所示:
int main(int argc, char *argv[])
{
Util util;
srand(time(NULL));
// Notice that current_database is assigned in the child process, not in the father process
// when used as endpoint, or when the database is indicated in command line, we can assign
// current_database in father process(but this is resource consuming)
while (true) {
// NOTICE: here we use 2 processes, father process is used for monitor and control(like, restart)
// Child process is used to deal with web requests, can also has many threads
pid_t fpid = fork();
if (fpid == 0) {
int ret = initialize(argc, argv);
exit(ret);
}
else if (fpid > 0)
{
int status;
waitpid(fpid, &status, 0);
if (WIFEXITED(status))
{
return 0;
}
else
{ // 采用文件保存启动端口并据此来判断端口是否被使用,在主进程退出时要删除该文件,否则下次无法正常启动
if (Util::file_exist("system.db/port.txt"))
{
string cmd = "rm system.db/port.txt";
system(cmd.c_str());
}
Util::formatPrint("Stopped abnormally, restarting server...", "WARN");
}
}
else
{
Util::formatPrint("Failed to start server: deamon fork failure.", "ERROR");
return -1;
}
}
return 0;
}
我们可以通过命令参数指定HTTP服务监听的端口、预加载的数据库资源,启动命令如下:
$ bin/ghttp -p [port] -db [db_name] -c [load_csr]
实现的片段代码如下:
db_name = Util::getArgValue(argc, argv, "db", "database");
if (db_name.length() > 3 && db_name.substr(db_name.length() - 3, 3) == ".db")
{
Util::formatPrint("Your db name to be built should not end with \".db\".","ERROR");
return -1;
}
else if (db_name == "system")
{
Util::formatPrint("You can not load system files.","ERROR");
return -1;
}
port_str = Util::getArgValue(argc, argv, "p", "port", "9000");
port = Util::string2int(port_str);
loadCSR = Util::string2int(Util::getArgValue(argc, argv, "c", "csr", "0"));
ghttp支持多线程服务,通过线程池的方式来管理一个线程队列,每次取一个任务分配给一个线程去做,初始化的线程池大小为30。ghttp的多线程包括两个方面,首先是HttpServer的多线程,其次是针对query接口的多线程;HttpServer的线程池将接收所有的请求,而query的线程池只会处理query请求;也可以解释为query线程池为HttpServer中某个线程的子线程池。
部分线程池相关的代码和注解如下:
// 初始化线程池数量
#define THREAD_NUM 30
// 任务类的定义
class Task
{
public:
bool update;
string username; // 访问用户名
string db_name; // 数据库名称
string format; // 返回数据格式(json、file、json+file)
string db_query; // sparql语句
string remote_ip; // 请求IP地址
string querytype; // 查询类型(0仅查询 1查询和更新)
string log_prefix;
const shared_ptr<HttpServer::Response> response; // http响应对象
const shared_ptr<HttpServer::Request> request; // http请求对象
Task(bool flag, string name, string ft, string query, const shared_ptr<HttpServer::Response>& res, const shared_ptr<HttpServer::Request>& req);
Task(string username, string name, string ft, string _remote_ip,string _log_prefix,string _querytype,
string query, const shared_ptr<HttpServer::Response>& res, const shared_ptr<HttpServer::Request>& req);
~Task();
void run();
};
void Task::run()
{
// 调用查询
query_thread_new(request, response, db_name, db_query, format, querytype, log_prefix, username);
}
// 线程类的定义,每个线程负责执行一个任务
class Thread
{
public:
thread TD;
int ID;
static int threadnum;
Task* task;
Thread();
~Thread();
int GetThreadID();
void assign(Task* t);
void run();
void start();
friend bool operator==(Thread t1, Thread t2);
friend bool operator!=(Thread t1, Thread t2);
};
void Thread::assign(Task* t)
{
task = t;
}
void Thread::run()
{
cout << "Thread:" << ID << " run\n";
task->run();
delete task;
BackToFree(this);
}
void Thread::start()
{
TD = thread(&Thread::run, this);
TD.detach();
}
// 线程池类的定义
class ThreadPool
{
public:
int ThreadNum;
bool isclose;
thread ThreadsManage;
queue<Task*> tasklines;
ThreadPool();
ThreadPool(int t);
~ThreadPool();
void create();
void SetThreadNum(int t);
int GetThreadNum();
void AddTask(Task* t);
void start();
void close();
};
// 默认线程池大小为30,若t为小于1的数,则设置为10
ThreadPool::ThreadPool(int t)
{
isclose = false;
if (t < 1)
{
t = 10;
}
ThreadNum = t;
busythreads.clear();
freethreads.clear();
for (int i = 0; i < t; i++)
{
Thread *p = new Thread();
freethreads.push_back(p);
}
}
// 初始化线程池
ThreadPool pool(THREAD_NUM);
// 在initialize中设置HttpServer的线程池
server.config.thread_pool_size= THREAD_NUM;
ghttp支持GET和POST请求,GET请求的参数若含有特殊字符需要进行URLEncode处理,在接收到参数后会进行对应的URLDecode,POST请求的参数以JSON格式放在body中。
// 对shutdown接口进行特殊处理
server.resource["/shutdown"]["GET"]=[&server](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request)
{
shutdown_handler(server, response, request, "GET");
};
server.resource["/shutdown"]["POST"] = [&server](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request)
{
shutdown_handler(server, response, request, "POST");
};
// 其他接口
server.default_resource["GET"]=[&server](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request)
{
request_handler(server, response, request,"GET");
};
server.default_resource["POST"] = [&server](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request)
{
request_handler(server, response, request,"POST");
};
thread server_thread([&server](){
//handle the Ctrl+C signal
signal(SIGINT, signalHandler);
//Start server
Util::formatPrint("ghttp server port " + to_string(server.config.port));
server.start();
});
//Wait for server to start so that the client can connect
this_thread::sleep_for(chrono::seconds(1));
server_thread.join();
Util::formatPrint("ghttp server stoped.");
每一个请求任务进来后,都会起一个新的子线程,并通过**thread::detach()**分离主线程。
void request_handler(const HttpServer& server, const shared_ptr<HttpServer::Response>& response, const shared_ptr<HttpServer::Request>& request, string request_type)
{
thread t(&request_thread, response, request, request_type);
t.detach();
return;
}
在子线程执行的方法request_thread中将对接口参数进行提取和校验,关于安全机制方面的源码解析会在后面详细展开。
部分片段代码和注解如下:
void request_thread(const shared_ptr<HttpServer::Response>& response,
const shared_ptr<HttpServer::Request>& request, string request_type)
{
string remote_ip = getRemoteIp(request);// 获取请求IP
string ipCheckResult = apiUtil->check_access_ip(remote_ip); // 校验IP是否为满足黑白名单配置规则
if(!ipCheckResult.empty())
{
sendResponseMsg(1101, ipCheckResult, "ipcheck", request, response);
return;
}
string thread_id = Util::getThreadID();
string log_prefix = "thread " + thread_id + " -- ";
string username;
string password;
string encryption;
string operation;
string db_name;
Document document; // 基于第三方库rapidjson的JSON解析器
string url;
// 请求类型为GET,通过URL提取参数并进行UrlDecode
if (request_type == "GET")
{
url = request->path;
url = UrlDecode(url);
cout << "get url: \n" << url << endl;
operation = WebUrl::CutParam(url, "operation"); // 接口类型
username = WebUrl::CutParam(url, "username"); // 用户名
password = WebUrl::CutParam(url, "password"); // 密码
db_name = WebUrl::CutParam(url, "db_name"); // 数据库名称
encryption = WebUrl::CutParam(url,"encryption"); // 密码是否为MD5密文
// 对参数进行UrlDecode
username = UrlDecode(username);
password = UrlDecode(password);
db_name = UrlDecode(db_name);
}
// 请求类型为POST,校验参数是否为有效的json格式
else if (request_type == "POST")
{
auto strJson = request->content.string();
cout << "post content: \n" << strJson << endl;
document.Parse(strJson.c_str());
if(document.HasParseError())
{
string error = "the post content is not fit the json format, content=" + strJson;
sendResponseMsg(1004, error, "paramcheck ", request, response);
return;
}
operation = "";
db_name = "";
username = "";
password = "";
// 判断是否含有参数以及参数类型是否一致
if(document.HasMember("operation")&&document["operation"].IsString())
{
operation=document["operation"].GetString(); // 接口类型
}
if(document.HasMember("db_name")&&document["db_name"].IsString())
{
db_name=document["db_name"].GetString(); // 用户名
}
if(document.HasMember("username")&&document["username"].IsString())
{
username=document["username"].GetString(); // 密码
}
if(document.HasMember("password")&&document["password"].IsString())
{
password=document["password"].GetString(); // 数据库名称
}
if(document.HasMember("encryption")&&document["encryption"].IsString())
{
encryption = document["encryption"].GetString(); // 密码是否为MD5密文
}
else
{
encryption = "0";
}
}
// 如果请求类型为GET和POST外的其他类型,则会返回异常
else
{
string msg = "The method type " + request_type + " is not support";
sendResponseMsg(1004, msg, "methodcheck", request, response);
return;
}
std::cout << "------------------------ ghttp-api ------------------------" << std::endl;
std::cout << "thread_id: " << thread_id << std::endl;
std::cout << "remote_ip: " << remote_ip << std::endl;
std::cout << "operation: " << operation << std::endl;
std::cout << "method: " << request_type << std::endl;
std::cout << "request_path: " << request->path << std::endl;
std::cout << "http_version: " << request->http_version << std::endl;
std::cout << "request_time: " << Util::get_date_time() << std::endl;
std::cout << "----------------------------------------------------------" << std::endl;
// check接口用于心跳检测,不做鉴权校验
if(operation=="check")
{
check_thread_new(request,response);
return;
}
// 校验用户名和密码是否正确
string checkidentityresult = apiUtil->check_indentity(username, password, encryption);
if (checkidentityresult.empty() == false)
{
sendResponseMsg(1001, checkidentityresult, "authcheck", request, response);
return;
}
// 校验接口权限
if (apiUtil->check_privilege(username, operation, db_name) == 0)
{
string msg = "You have no " + operation + " privilege, operation failed";
sendResponseMsg(1004, msg, "privilegecheck", request, response);
return;
}
......
}
本章节从gstore http server如何实现启动参数解析、服务的监听和请求处理逻辑的源码进行了片段的解析,建议在阅读的同时结合源码Main/ghttp.cpp一起,会更容易理解。在下一章节中将详细解析几个高频使用的接口实现逻辑,如build、load、query等。