此项目是根据sylar框架实现,是从零开始重写sylar,也是对sylar丰富与完善
项目地址:https://gitee.com/lzhiqiang1999/server-framework
项目介绍:实现了一个基于协程的服务器框架,支持多线程、多协程协同调度;支持以异步处理的方式提高服务器性能;封装了网络相关的模块,包括socket、http、servlet等,支持快速搭建HTTP服务器或WebSokcet服务器。
详细内容:日志模块,使用宏实现流式输出,支持同步日志与异步日志、自定义日志格式、日志级别、多日志分离等功能。线程模块,封装pthread相关方法,封装常用的锁包括(信号量,读写锁,自旋锁等)。IO协程调度模块,基于ucontext_t实现非对称协程模型,以线程池的方式实现多线程,多协程协同调度,同时依赖epoll实现了事件监听机制。定时器模块,使用最小堆管理定时器,配合IO协程调度模块可以完成基于协程的定时任务调度。hook模块,将同步的系统调用封装成异步操作(accept, recv, send等),配合IO协程调度能够极大的提升服务器性能。Http模块,封装了sokcet常用方法,支持http协议解析,客户端实现连接池发送请求,服务器端实现servlet模式处理客户端请求,支持单Reator多线程,多Reator多线程模式的服务器。
sleep(2)
睡眠2s后返回send
100k数据recv
直到数据接收成功send
100k数据上,这个操作一般问题不大,因为send数据无论如何都要占用时间,但如果fd迟迟不可写,那send会阻塞直到套接字可写,同样,在阻塞期间,其他协程也无法在当前线程上调度。recv
上,这个操作要直到recv超时或是有数据时才返回,期间调度器也无法调度其他协程。back
,等定时器超时。back
了,所以协徎2并不需要等2秒后才可以执行,而是立刻可以执行。同样,调度器检测到协程send
,由于不知道fd是不是马上可写,所以先在IOManager
上给fd注册一个写事件(回调函数是让当前协程call
并执行实际的send
操作),然后当前协程back,等可写事件发生。call
并继续recv
),然后本协程back
,等事件发生。call
以便继续执行。call
以便继续执行send
。call
以便继续执行recv
。johnsonli::IOManager iom(1);
// 任务协程1
iom.schedule([](){
sleep(2);
LOG_INFO(g_logger) << "sleep 2";
});
// 任务协程2
iom.schedule([](){
LOG_INFO(g_logger) << "fiber 2";
});
sleep
usleep
nanosleep
socket
connect
accept
read、readv、recv、recvfrom、recvmsg
write、writev、send、sendto、sendmsg
close
fcntl
ioctl
getsockopt
setsockopt
FdManager
类来记录所有分配过的fd的上下文,这是一个单例类,每个socket fd上下文记录了当前fd的读写超时,是否设置非阻塞等信息。FdCtx
类在用户态记录了fd的读写超时和非阻塞信息,其中非阻塞包括用户显式设置的非阻塞和hook内部设置的非阻塞,区分这两种非阻塞可以有效应对用户对fd设置/获取NONBLOCK模式的情形。class FdCtx : public std::enable_shared_from_this<FdCtx>
{
public:
// 成员方法
private:
bool m_isInit: 1; //是否初始化
bool m_isSocket: 1; //是否socket
bool m_sysNonblock: 1; //是否hook非阻塞
bool m_userNonblock: 1; //是否用户主动设置非阻塞
bool m_isClosed: 1; //是否关闭
int m_fd; //文件句柄
uint64_t m_recvTimeout; //读超时时间毫秒
uint64_t m_sendTimeout; //写时间时间毫秒
};
/**
* @brief 文件句柄管理类
*/
class FdManager {
public:
typedef RWMutex RWMutexType;
FdManager();
/**
* @brief 获取/创建文件句柄类FdCtx
* @param[in] fd 文件句柄
* @param[in] auto_create 是否自动创建
* @return 返回对应文件句柄类FdCtx::ptr
*/
FdCtx::ptr get(int fd, bool auto_create = false);
/**
* @brief 删除文件句柄类
* @param[in] fd 文件句柄
*/
void del(int fd);
private:
RWMutexType m_mutex; /// 读写锁
std::vector<FdCtx::ptr> m_datas; /// 文件句柄集合
};
typedef Singleton<FdManager> FdMgr; /// 文件句柄单例
t_hook_enable
。用于表示当前线程是否启用hook。各个线程可单独启用或关闭hook。static thread_local bool t_hook_enable = false;
// 当前线程是否hook
bool is_hook_enable() {
return t_hook_enable;
}
// 设置当前线程的hook状态
void set_hook_enable(bool flag) {
t_hook_enable = flag;
}
宏
获取被hook的接口的原始地址。#define HOOK_FUN(XX) \
XX(sleep) \
XX(usleep) \
XX(nanosleep) \
XX(socket) \
XX(connect) \
XX(accept) \
XX(read) \
XX(readv) \
XX(recv) \
XX(recvfrom) \
XX(recvmsg) \
XX(write) \
XX(writev) \
XX(send) \
XX(sendto) \
XX(sendmsg) \
XX(close) \
XX(fcntl) \
XX(ioctl) \
XX(getsockopt) \
XX(setsockopt)
extern "C" {
#define XX(name) name ## _fun name ## _f = nullptr;
HOOK_FUN(XX);
#undef XX
}
void hook_init() {
static bool is_inited = false;
if(is_inited) {
return;
}
#define XX(name) name ## _f = (name ## _fun)dlsym(RTLD_NEXT, #name);
HOOK_FUN(XX);
#undef XX
}
宏
展开后
extern "C" {
sleep_fun sleep_f = nullptr; \
usleep_fun usleep_f = nullptr; \
....
setsocketopt_fun setsocket_f = nullptr;
};
hook_init() {
...
sleep_f = (sleep_fun)dlsym(RTLD_NEXT, "sleep"); \
usleep_f = (usleep_fun)dlsym(RTLD_NEXT, "usleep"); \
...
setsocketopt_f = (setsocketopt_fun)dlsym(RTLD_NEXT, "setsocketopt");
}
hook_init()
放在一个静态对象的构造函数中调用,这表示在main函数运行之前就会获取各个符号的地址并保存在全局变量中
unsigned int sleep(unsigned int seconds)
{
//如果没使用hook,就调用原系统函数
if(!johnsonli::t_hook_enable)
{
return sleep_f(seconds);
}
//使用了hook,用自己实现的
johnsonli::Fiber::ptr fiber = johnsonli::Fiber::GetThis(); //获取当前协程
johnsonli::IOManager* iom = johnsonli::IOManager::GetThis(); //获取当前IOManager
iom->addTimer(seconds * 1000, [iom, fiber](){
iom->schedule(fiber);
});
johnsonli::Fiber::YieldToHoldBySwap();
return 0;
}
int socket(int domain, int type, int protocol)
{
//不使用hook
if(!johnsonli::t_hook_enable) {
return socket_f(domain, type, protocol);
}
//出错,直接返回
int fd = socket_f(domain, type, protocol);
if(fd == -1) {
return fd;
}
//成功创建,需要存储相关信息
johnsonli::FdMgr::GetInstance()->get(fd, true);
return fd;
}
connect
和connect_with_timeout
的hook实现。先尝试连接,超时,注册定时器,注册事件,退出当前协程。等事件发生,再回到该协程,此时可以连接成功;事件未发生并超时,回到该协程,将返回-1,说明超时。int connect_with_timeout(int fd, const struct sockaddr* addr, socklen_t addrlen, uint64_t timeout_ms) {
if(!johnsonli::t_hook_enable) {
//LOG_INFO(g_logger) << "connect";
return connect_f(fd, addr, addrlen);
}
johnsonli::FdCtx::ptr ctx = johnsonli::FdMgr::GetInstance()->get(fd);
//不存在
if(!ctx || ctx->isClose()) {
errno = EBADF;
return -1;
}
//不是socket
if(!ctx->isSocket()) {
return connect_f(fd, addr, addrlen);
}
//用户已经设置了非阻塞
if(ctx->getUserNonblock()) {
return connect_f(fd, addr, addrlen);
}
int n = connect_f(fd, addr, addrlen);
if(n == 0) {
return 0; //连接成功
}
else if(n != -1 || errno != EINPROGRESS) {
return n;
}
johnsonli::IOManager* iom = johnsonli::IOManager::GetThis();
johnsonli::Timer::ptr timer;
std::shared_ptr<timer_info> tinfo(new timer_info);
std::weak_ptr<timer_info> winfo(tinfo);
if(timeout_ms != (uint64_t)-1) {
timer = iom->addConditionTimer(timeout_ms, [winfo, fd, iom](){
auto t = winfo.lock();
if(!t || t->cancelled) {
return;
}
//超时,取消事件,回到HOLD(任务协程)
t->cancelled = ETIMEDOUT;
iom->cancelEvent(fd, johnsonli::IOManager::WRITE);
}, winfo);
}
int rt = iom->addEvent(fd, johnsonli::IOManager::WRITE);
if(rt == 0) {
johnsonli::Fiber::YieldToHoldBySwap();
//定时器还有,说明事件触发了,需要取消定时器
if(timer) {
// 此时取消定时器会强制执行定时任务,但是由于只是把定时任务加入到任务协程队列,因此不会马上执行。
// 必须等退出connect_with_timeout,调度线程再去调度。但是此时weak_ptr已经被释放,条件不成立,定时任务最终不会被执行
timer->cancel();
}
if(tinfo->cancelled) {
errno = tinfo->cancelled;
return -1;
}
}
else {
//超时
if(timer) {
timer->cancel();
}
LOG_ERROR(g_logger) << "connect addEvent(" << fd << ", WRITE) error";
}
int error = 0;
socklen_t len = sizeof(int);
//检查有无错误
if(-1 == getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)) {
return -1; // 调用失败
}
if(!error) {
return 0; //没有错误
}else { //有错误
errno = error;
return -1;
}
}
do_io
模板函数,具体实现和connect_with_timeout
类似。