gstore源码解析(三):安全机制之日志追踪
上一章我们介绍了安全机制的黑白名单配置,接下来将对安全机制中日志追踪的源码进行解析。
在gstore安全机制中的日志追踪分为三个模块:第一个是操作日志,记录调用接口进行数据库相关操作的信息;第二个是查询日志,记录调用查询接口的相关信息;第三个是事务日志,记录调用事务接口的相关信息。
在启动ghttp服务后,所有接口请求操作都将记录到日志文件中,日志文件的默认路径为logs/apaccess/
,以天为单位生成每天操作日志文件;日志内容为JSON字符串,格式如下所示:
{
"ip":"183.67.4.126", //访问ip
"operation":"show", //操作类型
"code":0, //结果状态码
"msg":"show successfully!", //结果描述
"createtime":"2022-04-21 11:47:16" //创建时间
}
操作日志将会在调用统一的返回函数或者部分接口函数时记录,部分代码如下所示:
/**
* @description: response message to client
* @Author: liwenjie
* @param {int} code: StatusCode value
* @param {string} msg:StatusMsg value
* @param {const} shared_ptr
*/
void sendResponseMsg(int code, string msg, const shared_ptr<HttpServer::Response>& response,string ip,string operation) {
string resJson = CreateJson(code, msg, 0);
cout<<"response result:"<<endl;
cout<<resJson<<endl;
*response << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << resJson.length() << "\r\n\r\n" << resJson;
// 写入操作日志
writeIpAccessLog(ip,operation,msg,code);
}
// 登录接口处理线程
void login_thread_new(const shared_ptr<HttpServer::Response>& response,string ip) {
string success="login successfully.";
string operation="login";
Document resDoc;
resDoc.SetObject();
Document::AllocatorType &allocator = resDoc.GetAllocator();
resDoc.AddMember("StatusCode", 0, allocator);
resDoc.AddMember("StatusMsg", "login successfully", allocator);
string version=Util::getConfigureValue("version");
resDoc.AddMember("CoreVersion", StringRef(version.c_str()), allocator);
string licensetype=Util::getConfigureValue("licensetype");
resDoc.AddMember("licensetype",StringRef(licensetype.c_str()), allocator);
StringBuffer resBuffer;
PrettyWriter<StringBuffer> resWriter(resBuffer);
resDoc.Accept(resWriter);
string resJson = resBuffer.GetString();
// 写入操作日志
writeIpAccessLog(ip,operation,success,0);
*response << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << resJson.length() << "\r\n\r\n" << resJson;
}
ghttp::writeIpAccessLog函数的实现如下所示:
/**
* write access log
*
* @param ip
* @param operation
* @param msg StatusCode value
* @param code StatusMsg value
*/
void writeIpAccessLog(string ip,string operation,string msg,int code) {
// 判断操作日志路径是否存在,不存在则创建
string iplog_name=Util::get_date_day();
if(Util::dir_exist(IP_ACCESS_PATH)==false) {
Util::create_dir(IP_ACCESS_PATH);
}
// 判断当天的日志文件是是否存在,不存在则创建
string iplogfile = IP_ACCESS_PATH + iplog_name + ".log";
if(Util::file_exist(iplogfile)==false)
{
cout<<"ip access log file is not exists, now create it."<<endl;
Util::create_file(iplogfile);
}
cout << "iplog: " << iplogfile << endl;
FILE* ip_logfp = fopen(iplogfile.c_str(), "a");
if (ip_logfp == NULL) {
cerr << "open ip log error" << endl;
return ;
}
// 日志文件上锁,避免多个线程同时写入日志数据
ip_log_lock.lock();
string createtime = Util::getTimeString3();
msg = Util::string_replace(msg, "\r\n", "");
msg = Util::string_replace(msg, "\n", "");
msg = Util::string_replace(msg, " ", "");
// 构建日志数据
rapidjson::Document doc;
doc.SetObject();
doc.AddMember("ip", rapidjson::StringRef(ip.c_str()), doc.GetAllocator());
doc.AddMember("operation", rapidjson::StringRef(operation.c_str()), doc.GetAllocator());
doc.AddMember("code", code, doc.GetAllocator());
doc.AddMember("msg", rapidjson::StringRef(msg.c_str()), doc.GetAllocator());
doc.AddMember("createtime", rapidjson::StringRef(createtime.c_str()), doc.GetAllocator());
rapidjson::StringBuffer strBuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
doc.Accept(writer);
string _info = strBuf.GetString();
_info.push_back(',');
_info.push_back('\n');
// 写入日志文件
fprintf(ip_logfp, "%s", _info.c_str());
Util::Csync(ip_logfp);
long logSize = ftell(ip_logfp);
fclose(ip_logfp);
cout << "logSize: " << logSize << endl;
ip_log_lock.unlock();
}
此外,除了可以通过本地文件的方式查看日志外,还提供了操作日志的查询接口,接口操作参数为operation=accesslog
,接口处理函数部分代码如下:
/**
* access log thread
*
* @param response
* @param file_name log data
* @param page_no
* @param page_size
*/
void accessLog_thread_new(const shared_ptr<HttpServer::Response>& response, string file_name, int page_no, int page_size,string ip) {
string operation="accessLog";
ip_log_lock.lock();
int totalSize = 0;
int totalPage = 0;
// 根据查询参数拼接日志文件路径
string queryLog = IP_ACCESS_PATH + file_name + ".log";
Document all;
Document list;
all.SetObject();
list.SetArray();
// 判断日志文件是否存在
if(Util::file_exist(queryLog)) {
try {
ifstream in;
string line;
in.open(queryLog, ios::in);
// 分页获取日志内容
......
list.Parse(line.c_str());
all.AddMember("StatusCode", 0, all.GetAllocator());
all.AddMember("StatusMsg", "Get access log success", all.GetAllocator());
all.AddMember("totalSize", totalSize, all.GetAllocator());
all.AddMember("totalPage", totalPage, all.GetAllocator());
all.AddMember("pageNo", page_no, all.GetAllocator());
all.AddMember("pageSize", page_size, all.GetAllocator());
all.AddMember("list", list, all.GetAllocator());
} catch (std::exception &e) {
all.AddMember("StatusCode", 1005, all.GetAllocator());
all.AddMember("message", "Error! Access log corrupted", all.GetAllocator());
writeIpAccessLog(ip,operation,"get data error!",1005);
}
} else {// 日志文件不存在,返回空数据集
all.AddMember("StatusCode", 0, all.GetAllocator());
all.AddMember("StatusMsg", "Get access log success", all.GetAllocator());
all.AddMember("totalSize", totalSize, all.GetAllocator());
all.AddMember("totalPage", totalPage, all.GetAllocator());
all.AddMember("pageNo", page_no, all.GetAllocator());
all.AddMember("pageSize", page_size, all.GetAllocator());
all.AddMember("list", list, all.GetAllocator());
}
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
all.Accept(writer);
string resJson = buffer.GetString();
*response << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << resJson.length() << "\r\n\r\n" << resJson;
ip_log_lock.unlock();
return;
}
查询接口是使用频率很高接口,针对查询接口生成的查询日志,既可以让我们很方便的追溯历史的查询信息,也可以通过查询日志分析耗时多的查询并及时进行优化调整。
日志文件的默认路径为logs/endpoints/
,以天为单位生成每天的日志文件;每一条查询日志为JSON字符串,其格式如下:
{
"QueryDateTime":"2021-12-15 19:51:41:927ms:335microseconds", //查询时间
"RemoteIP":"183.67.4.126", //IP地址
"Sparql":"SELECT ?x WHERE { ?x <导演> <张艺谋>. }", //SPARQL语句
"AnsNum":22, //结果数
"Format":"json", //数据格式
"FileName":"", //结果文件路径
"StatusCode":0, //状态码
"QueryTime":"155" //耗时ms
}
在执行完查询请求后,将调用ghttp::writeLog(string _info)记录日志内容,部分关键代码如下:
void query_thread_new(const shared_ptr<HttpServer::Response>& response,
string db_name,string sparql,string format,
string update_flag,string remote_ip,string log_prefix)
{
......
// 构建日志JSON数据
Document doc;
doc.SetObject();
Document::AllocatorType &doc_allocator = doc.GetAllocator();
doc.AddMember("QueryDateTime", StringRef(query_start_time.c_str()), doc_allocator);
doc.AddMember("RemoteIP", StringRef(remote_ip.c_str()), doc_allocator);
doc.AddMember("Sparql", StringRef(sparql.c_str()), doc_allocator);
doc.AddMember("AnsNum", rs.ansNum, doc_allocator);
doc.AddMember("Format", StringRef(format.c_str()), doc_allocator);
doc.AddMember("FileName", StringRef(filename.c_str()), doc_allocator);
doc.AddMember("QueryTime", query_time, doc_allocator);
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
doc.Accept(writer);
// 调用writeLog将日志内容写入文件
writeLog(buffer.GetString());
......
}
ghttp::writeLog函数的实现如下所示:
void writeLog(string _info) {
// 判断查询日志路径是否存在,若不存在则创建
string queyrlog_name = Util::get_date_day();
if (Util::dir_exist(QUERYLOG_PATH) == false) {
Util::create_dir(QUERYLOG_PATH);
}
// 拼接日志文件路径,并判断是否已存在,若不存在则创建
string querylog_file = QUERYLOG_PATH + queyrlog_name + ".log";
if (Util::file_exist(querylog_file) == false) {
cout<<"qeury log file is not exist, now create it."<<endl;
Util::create_file(querylog_file);
}
cout << "querylog: " << querylog_file << endl;
FILE* querylog_fp = fopen(querylog_file.c_str(), "a");
if (querylog_fp == NULL) {
cout<<"open query log error." <<endl;
return;
}
// 日志文件上锁,避免多个线程同时写入日志数据
query_log_lock.lock();
_info.push_back(',');
_info.push_back('\n');
// 写入数据
fprintf(querylog_fp, "%s", _info.c_str());
Util::Csync(querylog_fp);
long logSize = ftell(querylog_fp);
fclose(querylog_fp);
cout << "logSize: " << logSize << endl;
query_log_lock.unlock();
}
同样的,除了可以通过本地文件的方式查看日志外,还提供了操作日志的查询接口,接口操作参数为operation=querylog
,接口处理函数部分代码如下:
/**
* query log thread
*
* @param response
* @param file_name
* @param page_no
* @param page_size
*/
void quereyLog_thread_new(const shared_ptr<HttpServer::Response>& response, string file_name, int page_no, int page_size,string ip) {
string operation="querylog";
query_log_lock.lock();
int totalSize = 0;
int totalPage = 0;
// 根据查询参数拼接日志文件路径
string queryLog = QUERYLOG_PATH + file_name + ".log";
Document all;
Document list;
all.SetObject();
list.SetArray();
// 校验日志文件是否存在
if(Util::file_exist(queryLog)){
try {
ifstream in;
string line;
in.open(queryLog, ios::in);
// 分页查询日志信息
......
list.Parse(line.c_str());
totalPage = (totalSize/page_size) + (totalSize%page_size == 0 ? 0 : 1);
all.AddMember("StatusCode", 0, all.GetAllocator());
all.AddMember("StatusMsg", "Get query log success", all.GetAllocator());
all.AddMember("totalSize", totalSize, all.GetAllocator());
all.AddMember("totalPage", totalPage, all.GetAllocator());
all.AddMember("pageNo", page_no, all.GetAllocator());
all.AddMember("pageSize", page_size, all.GetAllocator());
all.AddMember("list", list, all.GetAllocator());
} catch (std::exception &e) {
all.AddMember("StatusCode", 1005, all.GetAllocator());
all.AddMember("message", "Error! Query log corrupted", all.GetAllocator());
writeIpAccessLog(ip,operation,"get data error!",1005);
}
} else {
all.AddMember("StatusCode", 0, all.GetAllocator());
all.AddMember("StatusMsg", "Get query log success", all.GetAllocator());
all.AddMember("totalSize", totalSize, all.GetAllocator());
all.AddMember("totalPage", totalPage, all.GetAllocator());
all.AddMember("pageNo", page_no, all.GetAllocator());
all.AddMember("pageSize", page_size, all.GetAllocator());
all.AddMember("list", list, all.GetAllocator());
}
// 返回查询结果
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
all.Accept(writer);
string resJson = buffer.GetString();
*response << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << resJson.length() << "\r\n\r\n" << resJson;
query_log_lock.unlock();
// 记录操作日志
writeIpAccessLog(ip,operation,"get data successfully!",0);
return;
}
只有在开启gstore的事务操作时,才会产生对应的事务日志;日志主要记录了事务操作的库和起止时间以及事务的状态,事务日志的默认存储路径为logs/transaction.json
,每条日志JSON格式的字符串,如下所示:
{
"db_name":"friend", //数据库名称
"TID":"1647921085829_1", //事务ID
"user":"root", //操作人用户名
"begin_time":"1647921085829", //开始时间
"state":"COMMITED", //状态
"end_time":"1647921113172" //结束时间
}
在开启事务时,将会新增一条事务日志,后续将通过TID来更新事务日志的状态值,部分核心代码如下:
// 开启事务的处理函数
void begin_thread_new(const shared_ptr<HttpServer::Response>& response,string db_name,string isolevel,string username,string ip) {
string error="";
string operation="begin";
......
txn_id_t TID = txn_m->Begin(static_cast<IsolationLevelType>(level));
cout <<"Transcation Id:"<< to_string(TID) << endl;
cout << to_string(txn_m->Get_Transaction(TID)->GetStartTime()) << endl;
string begin_time = to_string(txn_m->Get_Transaction(TID)->GetStartTime());
string Time_TID = begin_time + "_" + to_string(TID);
// 新增事务日志,状态值为RUNNING
Util::add_transactionlog(db_name, username, Time_TID, begin_time, "RUNNING", "INF");
......
}
// 提交事务的处理函数
void commit_thread_new(const shared_ptr<HttpServer::Response>& response,string db_name,string TID_s,string ip) {
string error="";
string operation="commit";
......
int ret = txn_m->Commit(TID);
......
string begin_time = to_string(txn_m->Get_Transaction(TID)->GetStartTime());
string Time_TID = begin_time + "_" + to_string(TID);
string end_time = to_string(txn_m->Get_Transaction(TID)->GetEndTime());
// 通过TID更新事务日志,状态值更新为COMMITED
Util::update_transactionlog(Time_TID, "COMMITED", end_time);
......
}
// 回滚事务的处理函数
void rollback_thread_new(const shared_ptr<HttpServer::Response>& response,string db_name,string TID_s,string ip) {
string error="";
string operation="rollback";
......
int ret = txn_m->Rollback(TID);
......
string begin_time = to_string(txn_m->Get_Transaction(TID)->GetStartTime());
string Time_TID = begin_time + "_" + to_string(TID);
string end_time = to_string(txn_m->Get_Transaction(TID)->GetEndTime());
// 通过TID更新事务日志,状态值更新为ROLLBACK
Util::update_transactionlog(Time_TID, "ROLLBACK", end_time);
......
}
// Util::add_transactionlog函数代码如下
int
Util::add_transactionlog(std::string db_name, std::string user, std::string TID, std::string begin_time, std::string state , std::string end_time) {
cout << "this is Util::add_transactionlog" << endl;
pthread_rwlock_wrlock(&transactionlog_lock);
FILE* fp = fopen(TRANSACTION_LOG_PATH, "a");
// 构建事务日志JSON数据
Document document;
document.SetObject();
Document::AllocatorType& allocator = document.GetAllocator();
document.AddMember("db_name", StringRef(db_name.c_str()), allocator);
document.AddMember("TID", StringRef(TID.c_str()), allocator);
document.AddMember("user", StringRef(user.c_str()), allocator);
document.AddMember("begin_time", StringRef(begin_time.c_str()), allocator);
document.AddMember("state", StringRef(state.c_str()), allocator);
document.AddMember("end_time", StringRef(end_time.c_str()), allocator);
StringBuffer buffer;
PrettyWriter<StringBuffer> writer(buffer);
document.Accept(writer);
string rec = buffer.GetString();
rec = Util::string_replace(rec, "\n", "");
rec = Util::string_replace(rec, " ", "");
rec.push_back('\n');
// 写入数据
fputs(rec.c_str(), fp);
fclose(fp);
pthread_rwlock_unlock(&transactionlog_lock);
}
// Util::update_transactionlog函数代码如下
int
Util::update_transactionlog(std::string TID, std::string state, std::string end_time) {
pthread_rwlock_wrlock(&transactionlog_lock);
FILE* fp = fopen(TRANSACTION_LOG_PATH, "r");
FILE* fp1 = fopen(TRANSACTION_LOG_TEMP_PATH, "w");
char readBuffer[0xffff];
int ret = 0;
// 遍历日志文件
while (fgets(readBuffer, 1024, fp)) {
string rec = readBuffer;
StringStream is(readBuffer);
Document d;
d.ParseStream(is);
// 不是当前要更新的数据,直接写入临时文件
if (d["TID"].GetString() != TID) {
fputs(readBuffer, fp1);
continue;
}
// 匹配与参数TID一致的数据,更新该数据的状态值和结束时间
if (d.HasMember("state") && d.HasMember("end_time")) {
Value& S = d["state"];
S.SetString(state.c_str(), state.length());
Value& SS = d["end_time"];
SS.SetString(end_time.c_str(), end_time.length());
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
d.Accept(writer);
string line = buffer.GetString();
line.push_back('\n');
fputs(line.c_str(), fp1);
}
else {
fputs(readBuffer, fp1);
cout << "Transaction log corrupted, please initilize it!" << endl;
ret = 1;
}
}
fclose(fp);
fclose(fp1);
// 删除原文件,再把临时文件重命名为原文件
string cmd = "rm ";
cmd += TRANSACTION_LOG_PATH;
system(cmd.c_str());
cmd = "mv ";
cmd += TRANSACTION_LOG_TEMP_PATH;
cmd += ' ';
cmd += TRANSACTION_LOG_PATH;
system(cmd.c_str());
pthread_rwlock_unlock(&transactionlog_lock);
return ret;
}
我们可以通过本地文件logs/transaction.json
的方式直接查看日志,也可以接口操作参数为operation=txnlog
,接口处理函数部分代码如下:
/**
* get transcation log
* @param response
* @param username
* @param page_no
* @param page_size
* @param ip
* @return {*}
*/
void txnlog_thread_new(const shared_ptr<HttpServer::Response>& response,string username, int page_no, int page_size,string ip) {
string operation="txnlog";
// 判断是否为root用户,限制了只有root用户可查看事务日志
if(username==ROOT_USERNAME) {
string resJson = Util::get_transactionlog(page_no, page_size);
writeIpAccessLog(ip,operation,"get txnlog successfully!",0);
*response << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << resJson.length() << "\r\n\r\n" << resJson;
} else {
string error = "Root User Only!";
sendResponseMsg(1003,error,response,ip,operation);
}
}
// 分页获取事务日志
string
Util::get_transactionlog(int page_no, int page_size) {
int totalSize = 0;
int totalPage = 0;
pthread_rwlock_rdlock(&transactionlog_lock);
ifstream in;
// 分页读取TRANSACTION_LOG_PATH文件数据,并解析为JSONArray格式的字符串返回
in.open(TRANSACTION_LOG_PATH, ios::in);
......
Document all;
Document::AllocatorType &allocator = all.GetAllocator();
all.SetObject();
......
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
all.Accept(writer);
string all_rec = buffer.GetString();
pthread_rwlock_unlock(&transactionlog_lock);
return all_rec;
}
本章节介绍了安全机制的全机制中的日志追踪模块,分析了操作日志、查询日志和事务日志的数据格式和实现方式以及如何通过接口获取日志信息,建议在阅读的同时结合源码Main/ghttp.cpp、Util/Util.cpp一起分析,会更容易理解。至此,gstore安全机制的解析就写完了,接下来我们将分析gstore的备份与恢复机制。