1. 概述
1.1 下载编译
# 下载
wget http://dist.schmorp.de/libev/libev-4.25.tar.gz
tar -zxf libev-4.25.tar.gz && cd libev-4.25/
# 编译安装
./configure --prefix=`pwd`/__build
make
make install
1.2 官方示例
shell> cat example.c
#include <ev.h>
#include <stdio.h> // for puts
ev_io stdin_watcher;
ev_timer timeout_watcher;
static void stdin_cb (struct ev_loop *loop, ev_io *w, int revents)
{
puts ("stdin ready");
ev_io_stop (loop, w);
// this causes all nested ev_run's to stop iterating
ev_break (loop, EVBREAK_ALL);
}
static void timeout_cb (struct ev_loop *loop, ev_timer *w, int revents)
{
puts ("timeout");
// this causes the innermost ev_run to stop iterating
ev_break (loop, EVBREAK_ONE);
}
int main (void)
{
struct ev_loop *loop = EV_DEFAULT;
ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
ev_io_start (loop, &stdin_watcher);
ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
ev_timer_start (loop, &timeout_watcher);
// now wait for events to arrive
ev_run (loop, 0);
return 0;
}
export LD_LIBRARY_PATH=`pwd`/__build/lib
gcc -g -o example example.c -I __build/include -L __build/lib -lev
./example
1.3 构建源码分析环境
find . -name "*.c" > cscope.files
find . -name "*.h" >> cscope.files
cscope -bq
2. 一切皆对象
2.1 事件
// 预编译去除宏并格式化, 方便查看
gcc -E ev.h > ev_dump.h
indent -kr -i4 -ts4 -l80 ev_dump.h
// 1. 事件基类
typedef struct ev_watcher {
int active; // 是否处于激活状态
int pending; // 是否已处理
int priority; // 优先级
void *data; // 数据
void (*cb) (struct ev_loop * loop, struct ev_watcher * w, int revents); // 回调函数
} ev_watcher;
// 2. I/O 事件
typedef struct ev_io {
int active;
int pending;
int priority;
void *data;
void (*cb) (struct ev_loop * loop, struct ev_io * w, int revents);
struct ev_watcher_list *next; // 事件单链表
int fd; // 文件描述符
int events; // 事件类型
} ev_io;
// 3. 定时器事件
typedef struct ev_timer {
int active;
int pending;
int priority;
void *data;
void (*cb) (struct ev_loop * loop, struct ev_timer * w, int revents);
ev_tstamp at; // 触发时间
ev_tstamp repeat; // 重复触发时间
} ev_timer;
........
2.2 事件循环 - ev_loop
gcc -E ev.c > ev_dump.c
indent -kr -i4 -ts4 -l80 ev_dump.c
struct ev_loop {
ev_tstamp ev_rt_now;
#1 "ev_vars.h" 1
#42 "ev_vars.h"
ev_tstamp now_floor;
ev_tstamp mn_now;
ev_tstamp rtmn_diff;
W *rfeeds;
int rfeedmax;
int rfeedcnt;
ANPENDING *pendings[5]; /* 待处理的事件(二维)和其优先级(一维) */
int pendingmax[5];
int pendingcnt[5];
int pendingpri; /* 目前待处理的事件最高优先级 */
ev_prepare pending_w;
ev_tstamp io_blocktime;
ev_tstamp timeout_blocktime;
int backend;
int activecnt;
sig_atomic_t volatile loop_done;
int backend_fd;
ev_tstamp backend_mintime;
void (*backend_modify) (struct ev_loop * loop, int fd, int oev, int nev);
void (*backend_poll) (struct ev_loop * loop, ev_tstamp timeout);
ANFD *anfds; // fd 信息数组, 以 fd 为索引
int anfdmax; // fd 信息数组长度
int evpipe[2];
ev_io pipe_w;
sig_atomic_t volatile pipe_write_wanted;
sig_atomic_t volatile pipe_write_skipped;
pid_t curpid;
char postfork;
void *vec_ri;
void *vec_ro;
void *vec_wi;
void *vec_wo;
int vec_max;
struct pollfd *polls;
int pollmax;
int pollcnt;
int *pollidxs;
int pollidxmax;
struct epoll_event *epoll_events;
int epoll_eventmax;
int *epoll_eperms;
int epoll_epermcnt;
int epoll_epermmax;
#128 "ev_vars.h"
int *fdchanges; // 改变了的 fd 数组(添加/删除)
int fdchangemax; // 数组长度
int fdchangecnt; // 使用计数
ANHE *timers;
int timermax;
int timercnt;
ANHE *periodics;
int periodicmax;
int periodiccnt;
ev_idle **idles[5];
int idlemax[5];
int idlecnt[5];
int idleall;
struct ev_prepare **prepares;
int preparemax;
int preparecnt;
struct ev_check **checks;
int checkmax;
int checkcnt;
struct ev_fork **forks;
int forkmax;
int forkcnt;
struct ev_cleanup **cleanups;
int cleanupmax;
int cleanupcnt;
sig_atomic_t volatile async_pending;
struct ev_async **asyncs;
int asyncmax;
int asynccnt;
int fs_fd;
ev_io fs_w;
char fs_2625;
ANFS fs_hash[((0x7f) & 2) ? 16 : 1];
sig_atomic_t volatile sig_pending;
int sigfd;
ev_io sigfd_w;
sigset_t sigfd_set;
unsigned int origflags;
unsigned int loop_count;
unsigned int loop_depth;
void *userdata;
void (*release_cb) (struct ev_loop * loop);
void (*acquire_cb) (struct ev_loop * loop);
ev_loop_callback invoke_cb;
#1831 "ev.c" 2
};
3. 核心流程分析
3.1 ev_io_start()
shell> vim ev_dump.c
void ev_io_start(struct ev_loop *loop, ev_io * w)
{
int fd = w->fd;
..........
// 1. make event to ready
ev_start(loop, (ev_watcher *) w, 1);
// 2. 如果 fd 信息数组大小不够, 则扩充
if (expect_false (fd + 1 > loop->anfdmax)) {
int ocur_ = loop->anfdmax;
loop->anfds = (ANFD *) array_realloc(
sizeof(ANFD),
loop->anfds,
&loop->anfdmax,
fd + 1
);
memset(
(void *) (loop->anfds + ocur_),
0,
sizeof(*(loop->anfds + ocur_)) * (loop->anfdmax - ocur_)
);
}
// 3. add w to loop->anfds[fd] list
wlist_add(&(loop->anfds)[fd].head, (ev_watcher_list *) w);
.......
// 4. 将 fd 添加改变了的 fd 数组
fd_change(loop, fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);
w->events &= ~EV__IOFDSET;
}
inline_speed void ev_start (struct ev_loop *loop, ev_watcher *w, int active)
{
pri_adjust (loop, w); // 设置优先级
w->active = active; // 标记激活
ev_ref (loop); // loop->activecnt + 1
}
inline_size void wlist_add (ev_watcher_list **head, ev_watcher_list *elem)
{
elem->next = *head;
*head = elem; // 链表头永远指向最新的
}
static __inline__ void fd_change(struct ev_loop *loop, int fd, int flags)
{
unsigned char reify = loop->anfds[fd].reify;
loop->anfds[fd].reify |= flags;
if (expect_true (!reify))
{
// 计数加 1
++loop->fdchangecnt;
// 改变的 fd 数组是否需要扩容
if (expect_false (loop->fdchangecnt > loop->fdchangemax))
{
int ocur_ = loop->fdchangemax;
loop->fdchanges = (int *)array_realloc(
sizeof (int),
loop->fdchanges,
&loop->fdchangemax,
loop->fdchangecnt
);
}
// 将 fd 放入改变了的数组
loop->fdchanges [loop->fdchangecnt - 1] = fd;
}
}
3.2 ev_run()
# 查看去除宏后的代码, 但部分代码又手动恢复成源代码方便查看
shell> vim ev_dump.c
int ev_run(struct ev_loop *loop, int flags)
{
++loop->loop_depth;
assert (("libev: ev_loop recursion during release detected",
loop_done != EVBREAK_RECURSE));
loop->loop_done = EVBREAK_CANCEL;
/* in case we recurse, ensure ordering stays nice and clean */
loop->invoke_cb(loop);
do {
/* penalise the forking check even more */
if (expect_false (loop->curpid)) {
if (expect_false (getpid () != loop->curpid)) {
loop->curpid = getpid ();
loop->postfork = 1;
}
}
/* we might have forked, so queue fork handlers */
if (expect_false (loop->postfork)) {
if (loop->forkcnt) {
queue_events (loop, (W *)loop->forks, loop->forkcnt, EV_FORK);
loop->invoke_cb (loop);
}
}
/* queue prepare watchers (and execute them) */
if (expect_false (loop->preparecnt)) {
queue_events (loop, (W *)loop->prepares, loop->preparecnt, EV_PREPARE);
loop->invoke_cb (loop);
}
if (expect_false (loop->loop_done)) {
break;
}
/* we might have forked, so reify kernel state if necessary */
if (expect_false (loop->postfork)) {
loop_fork(loop);
}
// 1. 注册事件
fd_reify(loop);
// 计算阻塞时间
{
ev_tstamp waittime = 0.;
ev_tstamp sleeptime = 0.;
/* remember old timestamp for io_blocktime calculation */
ev_tstamp prev_mn_now = loop->mn_now;
/* update time to cancel out callback processing overhead */
time_update(loop, 1e100);
/* from now on, we want a pipe-wake-up */
loop->pipe_write_wanted = 1;
/* make sure pipe_write_wanted is visible before we check for potential skips */
__asm__ __volatile__("mfence":::"memory");
if (expect_true (!(flags & EVRUN_NOWAIT || loop->idleall
|| !loop->activecnt || loop->pipe_write_skipped))) {
waittime = MAX_BLOCKTIME; // 59.743
if (loop->timercnt) {
ev_tstamp to = loop->timers[(4 - 1)].at - loop->mn_now;
if (waittime > to)
waittime = to;
}
if (loop->periodiccnt) {
ev_tstamp to = loop->periodics[(4 - 1)].at - loop->ev_rt_now;
if (waittime > to)
waittime = to;
};
/*don't let timeouts decrease the waittime below timeout_blocktime */
if (expect_false (waittime < loop->timeout_blocktime)) {
waittime = loop->timeout_blocktime;
}
/* at this point, we NEED to wait, so we have to ensure */
/* to pass a minimum nonzero value to the backend */
if (expect_false (waittime < loop->backend_mintime)) {
waittime = loop->backend_mintime;
}
/* extra check because io_blocktime is commonly 0 */
if (expect_false (loop->io_blocktime)) {
sleeptime = loop->io_blocktime - (loop->mn_now - prev_mn_now);
if (sleeptime > waittime - loop->backend_mintime)
sleeptime = waittime - loop->backend_mintime;
if (expect_true (sleeptime > 0.)) {
ev_sleep (sleeptime);
waittime -= sleeptime;
}
}
}
++loop->loop_count;
// 2. 等待事件发生, 实际调用的是 epoll_poll, 见 loop_init()->epoll_init()
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
loop->backend_poll (loop, waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
loop->pipe_write_wanted = 0; /* just an optimisation, no fence needed */
__asm__ __volatile__("":::"memory");
if (pipe_write_skipped) {
assert (("libev: pipe_w not active,
but pipe not written", ev_is_active (&pipe_w)));
ev_feed_event (loop, &pipe_w, EV_CUSTOM);
}
/* update ev_rt_now, do magic */
time_update(loop, waittime + sleeptime);
}
// 处理定时器事件
timers_reify(loop);
// 处理绝对时间的事件
periodics_reify(loop);
// 处理空闲事件
idle_reify(loop);
/* queue check watchers, to be executed first */
if (expect_false (checkcnt)) {
queue_events (loop, (W *)loop->checks, loop->checkcnt, EV_CHECK);
}
// 3. 调用 ev_invoke_pending() 处理事件
loop->invoke_cb (loop);
} while (expect_true (
loop->activecnt
&& !loop->loop_done
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
));
if (loop->loop_done == EVBREAK_ONE) {
loop->loop_done = EVBREAK_CANCEL;
}
--loop->loop_depth;
return loop->activecnt;
}
3.2.1 注册事件
fd_reify()
backend_modify(), 即 epoll_modify()
epoll_ctl()
static void fd_reify(struct ev_loop *loop)
{
int i;
for (i = 0; i < loop->fdchangecnt; ++i) {
int fd = loop->fdchanges[i]; // 取出文件描述符
ANFD *anfd = loop->anfds + fd; // 取出文件描述符数据
ev_io *w;
unsigned char o_events = anfd->events;
unsigned char o_reify = anfd->reify;
anfd->reify = 0;
{
anfd->events = 0;
// 汇总文件描述符的所有事件
for (w = (ev_io *) anfd->head; w; w = (ev_io *) ((WL) w)->next)
anfd->events |= (unsigned char) w->events;
if (o_events != anfd->events)
o_reify = EV__IOFDSET;
}
// 注册到 epoll, backend_modify 指向 epoll_modify
if (o_reify & EV__IOFDSET)
loop->backend_modify(loop, fd, o_events, anfd->events);
}
loop->fdchangecnt = 0;
}
static void epoll_modify(struct ev_loop *loop, int fd, int oev, int nev)
{
struct epoll_event ev;
unsigned char oldmask;
if (!nev)
return;
oldmask = loop->anfds[fd].emask;
loop->anfds[fd].emask = nev;
/*store the generation counter in the upper 32 bits, the fd in the lower 32 bits*/
ev.data.u64 = (uint64_t) (uint32_t) fd
| ((uint64_t) (uint32_t)++loop->anfds[fd].egen << 32);
ev.events = (nev & EV_READ ? EPOLLIN : 0)
| (nev & EV_WRITE ? EPOLLOUT : 0);
// 注册事件
if (expect_true (!epoll_ctl (backend_fd,
oev && oldmask != nev ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev)))
return;
if (expect_true (errno == ENOENT))
{
/* if ENOENT then the fd went away, so try to do the right thing */
if (!nev)
goto dec_egen;
if (!epoll_ctl (backend_fd, EPOLL_CTL_ADD, fd, &ev))
return;
}
else if (expect_true (errno == EEXIST))
{
/* EEXIST means we ignored a previous DEL, but the fd is still active */
/* if the kernel mask is the same as the new mask, we assume it hasn't changed */
if (oldmask == nev)
goto dec_egen;
if (!epoll_ctl (backend_fd, EPOLL_CTL_MOD, fd, &ev))
return;
}
else if (expect_true (errno == EPERM))
{
/* EPERM means the fd is always ready, but epoll is too snobbish */
/* to handle it, unlike select or poll. */
anfds [fd].emask = EV_EMASK_EPERM;
/* add fd to epoll_eperms, if not already inside */
if (!(oldmask & EV_EMASK_EPERM))
{
array_needsize (int, epoll_eperms, epoll_epermmax, epoll_epermcnt + 1, EMPTY2);
epoll_eperms [epoll_epermcnt++] = fd;
}
return;
}
fd_kill (EV_A_ fd);
dec_egen:
/* we didn't successfully call epoll_ctl, so decrement the generation counter again */
--anfds [fd].egen;
}
3.2.2 等待事件
epoll_poll
epoll_wait
fd_event
fd_event_nocheck
ev_feed_event
ev_feed_event
static void epoll_poll (struct ev_loop *loop, ev_tstamp timeout)
{
int i;
int eventcnt;
if (expect_false (loop->epoll_epermcnt))
timeout = 0.;
// 等待事件发生
EV_RELEASE_CB;
eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);
EV_ACQUIRE_CB;
if (expect_false (eventcnt < 0))
{
if (errno != EINTR)
ev_syserr ("(libev) epoll_wait");
return;
}
for (i = 0; i < eventcnt; ++i)
{
struct epoll_event *ev = epoll_events + i;
int fd = (uint32_t)ev->data.u64; /* mask out the lower 32 bits */
int want = anfds [fd].events;
int got = (ev->events & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
| (ev->events & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0);
/*
* check for spurious notification.
* this only finds spurious notifications on egen updates
* other spurious notifications will be found by epoll_ctl, below
* we assume that fd is always in range, as we never shrink the anfds array
*/
if (expect_false ((uint32_t)anfds [fd].egen != (uint32_t)(ev->data.u64 >> 32)))
{
/* recreate kernel state */
postfork |= 2;
continue;
}
if (expect_false (got & ~want))
{
anfds [fd].emask = want;
/*
* we received an event but are not interested in it, try mod or del
* this often happens because we optimistically do not unregister fds
* when we are no longer interested in them, but also when we get spurious
* notifications for fds from another process. this is partially handled
* above with the gencounter check (== our fd is not the event fd), and
* partially here, when epoll_ctl returns an error (== a child has the fd
* but we closed it).
*/
ev->events = (want & EV_READ ? EPOLLIN : 0)
| (want & EV_WRITE ? EPOLLOUT : 0);
/* pre-2.6.9 kernels require a non-null pointer with EPOLL_CTL_DEL, */
/* which is fortunately easy to do for us. */
if (epoll_ctl (backend_fd, want ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, fd, ev))
{
postfork |= 2; /* an error occurred, recreate kernel state */
continue;
}
}
// 把事件加入待处理列表
fd_event (EV_A_ fd, got);
}
/* if the receive array was full, increase its size */
if (expect_false (eventcnt == epoll_eventmax))
{
ev_free (epoll_events);
epoll_eventmax = array_nextsize (
sizeof (struct epoll_event),
epoll_eventmax,
epoll_eventmax + 1
);
epoll_events = (struct epoll_event *)ev_malloc (
sizeof (struct epoll_event) * epoll_eventmax);
}
/* now synthesize events for all fds where epoll fails, while select works... */
for (i = epoll_epermcnt; i--; )
{
int fd = epoll_eperms [i];
unsigned char events = anfds [fd].events & (EV_READ | EV_WRITE);
if (anfds [fd].emask & EV_EMASK_EPERM && events)
fd_event (EV_A_ fd, events);
else
{
epoll_eperms [i] = epoll_eperms [--epoll_epermcnt];
anfds [fd].emask = 0;
}
}
}
static void fd_event(struct ev_loop *loop, int fd, int revents)
{
ANFD *anfd = loop->anfds + fd;
if (expect_true (!anfd->reify))
fd_event_nocheck(loop, fd, revents);
}
static void fd_event_nocheck(struct ev_loop *loop, int fd, int revents)
{
ANFD *anfd = loop->anfds + fd;
ev_io *w;
for (w = (ev_io *) anfd->head; w; w = (ev_io *) ((ev_watcher_list *) w)->next) {
int ev = w->events & revents;
if (ev)
ev_feed_event(loop, (ev_watcher *) w, ev);
}
}
void ev_feed_event(struct ev_loop *loop, void *w, int revents)
{
ev_watcher *w_ = (ev_watcher *) w;
int pri = w_->priority - (EV_FEATURES & 4 ? -2 : 0);
if (expect_false (w_->pending))
loop->pendings[pri][w_->pending - 1].events |= revents;
else {
w_->pending = ++loop->pendingcnt[pri];
if (expect_false(w_->pending > loop->pendingmax[pri])) {
loop->pendings[pri] = (ANPENDING *) array_realloc(
sizeof(ANPENDING),
loop->pendings[pri],
&loop->pendingmax[pri],
w_->pending
);
};
// 按优先级将事件放入 loop->pendings[][]
loop->pendings[pri][w_->pending - 1].w = w_;
loop->pendings[pri][w_->pending - 1].events = revents;
}
loop->pendingpri = ((EV_FEATURES & 4) ? +2 : 0) - ((EV_FEATURES & 4) ? -2 : 0);
}
3.2.3 处理事件
void ev_invoke_pending(struct ev_loop *loop)
{
loop->pendingpri = 5;
do {
--loop->pendingpri;
while (loop->pendingcnt[loop->pendingpri]) {
ANPENDING *p = loop->pendings[loop->pendingpri] +
--loop->pendingcnt[loop->pendingpri];
// 回调事件处理函数
p->w->pending = 0;
p->w->cb(loop, p->w, p->events);
}
}
while (loop->pendingpri);
}