uwebsockets主要由libuv和uskcoets的库加上c++模板代码uwebsockets组成。 这个项目应该属于nodejs的生态,因为底层是libuv,项目中可以完全当成addon插件加入到nodejs项目中,libuv直接链接的node的动态库。
使用方式:可以以服务端形式独立于node的loop(libuv循环事件主体)收发消息,也可以自己加入客户端代码将fd加入node的loop中使用。第一种方式两个loop线程独立,需要注意自己实现通知机制;第二种方式在同一个loop线程中,没有线程安全问题。这里分三个部分分别分析libuv,usockets,uwebsockets的原理和代码。
libuv原理分析:
libuv的基本使用流程是
1分配loop内存空间:uv_loop_t *loop = malloc(sizeof(uv_loop_t));
2初始化loop:uv_loop_init(loop);
3开始循环:uv_run(loop, UV_RUN_DEFAULT);
4结束退出:uv_loop_close(loop);
5释放空间: free(loop);
可以加入到loop中的事件可以是socketfd,管道fd。其中管道fd的用途比较特殊,是用来通知事件而不是直接传递消息,需要和队列配合使用。进程或线程间管道通信的关键函数:uv_async_init和uv_async_send。
uv_loop_init初始化资源,可以关注下定时器初始化heap_init函数使用了最小堆。uv__platform_loop_init->uv__epoll_init初始化创建了epoll,这里可以关注下epoll_create1是epoll_create升级版。使用epoll_create1的优点是它允许你指定标志,O_CLOEXEC,子进程文件描述符会自动关闭。uv_signal_init会创建管道,只初始化一次,这个通常是给子进程使用的。uv_async_init也会创建管道,但是可以每两个loop间创建一对。
uv__work_done就是管道消息到来的回调函数,参数uv_async_t包含了发送端和接收端的loop地址,可以通过这个执行对应loop线程回调。主流程还包含了两种锁的初始化:uv_rwlock_init->pthread_rwlock_init读写锁,uv_mutex_init->pthread_mutex_ini互斥锁。
uv_run开始执行循环,每次循环都会uv__update_time,更新时间,这个定时器会用到。uv__run_timers开启定时器。uv__run_pending执行已经就绪的队列回调。
uv__run_idle和uv__run_prepare执行的是以下宏代码
void uv__run_##name(uv_loop_t* loop) { \
uv_##name##_t* h; \
QUEUE queue; \
QUEUE* q; \
QUEUE_MOVE(&loop->name##_handles, &queue); \
while (!QUEUE_EMPTY(&queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
QUEUE_INSERT_TAIL(&loop->name##_handles, q); \
h->name##_cb(h); \
} \
}
也就是可以自定义idle_cb和prepare_cb回调函数。 uv__io_poll执行监听有事件返回,执行epoll_wait超时时间是-1也就是阻塞或者定时器即将超时的最小时间的值保证定时器回调能比较准确得到执行。循环直到有异常发生。如果要使用工作线程池,uv_queue_work将任务发送给线程池就行,需要带上执行回调函数和执行完成回调函数,线程池就是使用条件变量加锁通知线程。
如果有新的连接fd需要加入loop,使用uv_poll_init_socket执行epoll_ctl加入新的fd监听。需要注意的是fd收到事件回调中需要uv_poll_stop->uv__platform_invalidate_fd执行epoll_ctl的EPOLL_CTL_DEL先关掉事件监听,然后将这个fd加入到
loop->watchers中,等执行完回调uv__io_poll会重新打开。
如果要加入定时回调,uv_run之前需要加入uv_timer_init和uv_timer_start,在run之后循环中可以uv_timer_start继续加入或者uv_timer_stop停止。uv_timer_start核心函数heap_insert最小堆插入。
uv_async_init->uv__async_start->uv__make_pipe->uv_pipes生成一对pipefd,注意管道是单向通行的,如果需要双向需要两对管道,这里不需要关心,因为只会有一个uv_async_t可以使用。
//pipefd[0]是读端加入到async_io_watcher中,pipefd[1]写端赋值给当前loop的async_wfd ,uv__async_io是可读事件的回调
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
uv_async_send就是向async_wfd写1个字节,目的是通知而不是传递消息。这里使用原子操作判断有没有线程在写,是线程安全的。
int uv_async_send(uv_async_t* handle) {
/* Do a cheap read first. */
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
/* Tell the other thread we're busy with the handle. */
if (cmpxchgi(&handle->pending, 0, 1) != 0)
return 0;
/* Wake up the other thread's event loop. */
uv__async_send(handle->loop);
/* Tell the other thread we're done. */
if (cmpxchgi(&handle->pending, 1, 2) != 1)
abort();
return 0;
}
usockets源码分析
usockets封装了libuv接口。具体先来看这几个宏:
/* 512kb shared receive buffer */定义了socketfd的最长的接受缓冲+2*LIBUS_RECV_BUFFER_PADDING
#define LIBUS_RECV_BUFFER_LENGTH 524288
/* A timeout granularity of 4 seconds means give or take 4 seconds from set timeout */4秒超时
#define LIBUS_TIMEOUT_GRANULARITY 4
/* 32 byte padding of receive buffer ends */接收附加缓冲
#define LIBUS_RECV_BUFFER_PADDING 32
/* Guaranteed alignment of extension memory */字节对齐长度,uwebsockets用到
#define LIBUS_EXT_ALIGNMENT 16