整个goahead的框架其实包含了两种任务,一种是socket任务,就是处理网页的连接以及通信,另一种是定时器任务,用来定时执行某个函数。下面我就从这两条线大致描述一下goahead的工作逻辑。
/*
This structure stores scheduled events.
*/
typedef struct Callback {
void (*routine)(void *arg, int id);
void *arg;
WebsTime at;
int id;
} Callback;
static Callback **callbacks;
static int callbackMax;
static HashTable **sym; /* List of symbol tables */
static int symMax; /* One past the max symbol table */
PUBLIC int websRuntimeOpen(void)
{
symMax = 0;
sym = 0;
srand((uint) time(NULL));
return 0;
}
/*
Schedule an event in delay milliseconds time. We will use 1 second granularity for webServer.
*/
PUBLIC int websStartEvent(int delay, WebsEventProc proc, void *arg)
{
Callback *s;
int id;
if ((id = wallocObject(&callbacks, &callbackMax, sizeof(Callback))) < 0) {
return -1;
}
s = callbacks[id];
s->routine = proc;
s->arg = arg;
s->id = id;
/*
Round the delay up to seconds.
*/
s->at = ((delay + 500) / 1000) + time(0);
return id;
}
该函数就是创建并填充结构体,将结构体存储在callbacks数组里面。
其中s->at取值是计算运行时的时间。+500再/1000是为了取整。如果想要循环定时,则需要在回调函数内调用websRestartEvent。
MAIN->websOpen
if (!websDebug) {
pruneId = websStartEvent(WEBS_SESSION_PRUNE, (WebsEventProc) pruneSessions, 0);
}
该事件的作用是,每隔一分钟去清除一次失效的webSessions。
MAIN->websListen->websAccept
wp->timeout = websStartEvent(PARSE_TIMEOUT, checkTimeout, (void*) wp);
该事件的作用是判断当前连接是否已经超时,超时的话就释放掉相关变量。
在进行读事件时使用websNoteRequestActivity来标记一个时间戳。
在checkTimeout中使用getTimeSinceMark来获取标记后至今的时间。
判断经过的时间是否超过了最大时间限制(60s),
假如超过了时间限制,就根据网页状态,判断是请求超时还是已经断开连接。
没有超过时间限制,就计算一下进行下一次检测的时间(60s - 经过的时间)。
PUBLIC WebsSocket **socketList; /* List of open sockets */
PUBLIC int socketMax; /* Maximum size of socket */
PUBLIC Socket socketHighestFd = -1; /* Highest socket fd opened */
PUBLIC int socketOpenCount = 0; /* Number of task using sockets */
PUBLIC int socketOpen(void)
{
Socket fd;
if (++socketOpenCount > 1) {
return 0;
}
socketList = NULL;
socketMax = 0;
socketHighestFd = -1;
if ((fd = socket(AF_INET6, SOCK_STREAM, 0)) != -1) {
hasIPv6 = 1;
closesocket(fd);
} else {
trace(1, "This system does not have IPv6 support");
}
return 0;
}
/*
Allocate a new socket structure
*/
PUBLIC int socketAlloc(cchar *ip, int port, SocketAccept accept, int flags)
{
WebsSocket *sp;
int sid;
if (socketMax >= FD_SETSIZE) {
return -1;
}
if ((sid = wallocObject(&socketList, &socketMax, sizeof(WebsSocket))) < 0) {
return -1;
}
sp = socketList[sid];
sp->sid = sid;
sp->accept = accept;
sp->port = port;
sp->fileHandle = -1;
sp->saveMask = -1;
if (ip) {
sp->ip = sclone(ip);
}
sp->flags = flags & (SOCKET_BLOCK | SOCKET_LISTENING | SOCKET_NODELAY);
return sid;
}
跟上面的定时器任务的创建事件是一样的,都是给结构体赋值,指定WebSocket的回调函数,并将结构体存储在socketList数组内。
与上面定时器事件不同的是,所有的socket事件回调函数都是同一个。该回调函数就是
/*
Accept a new connection from ipaddr:port
*/
PUBLIC int websAccept(int sid, cchar *ipaddr, int port, int listenSid)
{
Webs *wp;
WebsSocket *lp;
struct sockaddr_storage ifAddr;
int wid, len;
assert(sid >= 0);
assert(ipaddr && *ipaddr);
assert(listenSid >= 0);
assert(port >= 0);
/*
Allocate a new handle for this accepted connection. This will allocate a Webs structure in the webs[] list
*/
if ((wid = websAlloc(sid)) < 0) {
return -1;
}
wp = webs[wid];
assert(wp);
wp->listenSid = listenSid;
strncpy(wp->ipaddr, ipaddr, min(sizeof(wp->ipaddr) - 1, strlen(ipaddr)));
/*
Get the ip address of the interface that accept the connection.
*/
len = sizeof(ifAddr);
if (getsockname(socketPtr(sid)->sock, (struct sockaddr*) &ifAddr, (Socklen*) &len) < 0) {
error("Cannot get sockname");
websFree(wp);
return -1;
}
socketAddress((struct sockaddr*) &ifAddr, (int) len, wp->ifaddr, sizeof(wp->ifaddr), NULL);
#if ME_GOAHEAD_LEGACY
/*
Check if this is a request from a browser on this system. This is useful to know for permitting administrative
operations only for local access
*/
if (strcmp(wp->ipaddr, "127.0.0.1") == 0 || strcmp(wp->ipaddr, websIpAddr) == 0 ||
strcmp(wp->ipaddr, websHost) == 0) {
wp->flags |= WEBS_LOCAL;
}
#endif
/*
Arrange for socketEvent to be called when read data is available
*/
lp = socketPtr(listenSid);
trace(4, "New connection from %s:%d to %s:%d", ipaddr, port, wp->ifaddr, lp->port);
#if ME_COM_SSL
if (lp->secure) {
wp->flags |= WEBS_SECURE;
trace(4, "Upgrade connection to TLS");
if (sslUpgrade(wp) < 0) {
error("Cannot upgrade to TLS");
websFree(wp);
return -1;
}
}
#endif
assert(wp->timeout == -1);
wp->timeout = websStartEvent(PARSE_TIMEOUT, checkTimeout, (void*) wp);
socketEvent(sid, SOCKET_READABLE, wp);
return 0;
}
static Webs **webs; /* Open connection list head */
static int websMax; /* List size */
该回调函数的流程是:
(1)创建一个Webs结构体,然后填入相关参数。
(2)创建一个定时器事件,定时检测该连接是否超时。(定时器事件的实现上面已经讲过了。)
(3)调用子函数socketEvent执行事件。
/*
The webs socket handler. Called in response to I/O. We just pass control to the relevant read or write handler. A
pointer to the webs structure is passed as a (void*) in wptr.
*/
static void socketEvent(int sid, int mask, void *wptr)
{
Webs *wp;
wp = (Webs*) wptr;
assert(wp);
assert(websValid(wp));
if (! websValid(wp)) {
return;
}
if (mask & SOCKET_READABLE) {
readEvent(wp);
}
if (mask & SOCKET_WRITABLE) {
writeEvent(wp);
}
if (wp->flags & WEBS_CLOSED) {
websFree(wp);
/* WARNING: wp not valid here */
}
}
这个函数其实也就是个套子,真正的执行在readEvent中
/*
The webs read handler. This is the primary read event loop. It uses a state machine to track progress while parsing
the HTTP request. Note: we never block as the socket is always in non-blocking mode.
*/
static void readEvent(Webs *wp)
{
WebsBuf *rxbuf;
WebsSocket *sp;
ssize nbytes;
assert(wp);
assert(websValid(wp));
if (!websValid(wp)) {
return;
}
websNoteRequestActivity(wp); //创建一个时间戳,后面定时器任务会用到。
rxbuf = &wp->rxbuf;
if (bufRoom(rxbuf) < (ME_GOAHEAD_LIMIT_BUFFER + 1)) { //判断可用空间
if (!bufGrow(rxbuf, ME_GOAHEAD_LIMIT_BUFFER + 1)) { //扩大空间
websError(wp, HTTP_CODE_INTERNAL_SERVER_ERROR, "Cannot grow rxbuf");
websPump(wp);
return;
}
}
//读取数据
if ((nbytes = websRead(wp, (char*) rxbuf->endp, ME_GOAHEAD_LIMIT_BUFFER)) > 0) {
wp->lastRead = nbytes;
bufAdjustEnd(rxbuf, nbytes); //调整buf内指针的位置
bufAddNull(rxbuf); //buf末加'\0'
}
if (nbytes > 0 || wp->state > WEBS_BEGIN) {
websPump(wp);
}
if (wp->flags & WEBS_CLOSED) {
return;
} else if (nbytes < 0 && socketEof(wp->sid)) {
/* EOF or error. Allow running requests to continue. */
if (wp->state < WEBS_READY) {
if (wp->state >= WEBS_BEGIN) {
websError(wp, HTTP_CODE_COMMS_ERROR | WEBS_CLOSE, "Read error: connection lost");
websPump(wp);
} else {
complete(wp, 0);
}
} else {
socketDeleteHandler(wp->sid);
}
} else if (wp->state < WEBS_READY) {
sp = socketPtr(wp->sid);
socketCreateHandler(wp->sid, sp->handlerMask | SOCKET_READABLE, socketEvent, wp);
}
}
/*
Adjust the endp pointer after the user has copied data into the queue.
*/
PUBLIC void bufAdjustEnd(WebsBuf *bp, ssize size)
{
//endp是数据的结尾的指针,endbuf是buf结尾的指针。
assert(bp);
//确定buf的长度
assert(bp->buflen == (bp->endbuf - bp->buf));
//新读入的数据大小必须小于buf的长度
assert(0 <= size && size < bp->buflen);
//新读入了size字节的数据,所以要将endp移动。
bp->endp += size;
//假如移动后超出了buf本身的范围,就将endp赋值为超出一个buflen的偏移。(这样数据不会出问题吗?)
if (bp->endp >= bp->endbuf) {
bp->endp -= bp->buflen;
}
/*
Flush the queue if the endp pointer is corrupted via a bad size
*/
//左移后依然超出范围,
//也就是endp+size-buflen>=endbuf,因为size<buflen,
//也就是说移动之前的endp>endbuf,
//那就说明endp出问题了,直接把buf清空。
if (bp->endp >= bp->endbuf) {
error("Bad end pointer");
bufFlush(bp);
}
}
PUBLIC void websServiceEvents(int *finished)
{
int delay, nextEvent;
if (finished) {
*finished = 0;
}
delay = 0;
while (!finished || !*finished) {
if (socketSelect(-1, delay)) {
socketProcess();
}
#if ME_GOAHEAD_CGI
delay = websCgiPoll();
#else
delay = MAXINT;
#endif
nextEvent = websRunEvents();
delay = min(delay, nextEvent);
}
}
static void initPlatform(void)
{
#if ME_UNIX_LIKE
signal(SIGTERM, sigHandler);
#ifdef SIGPIPE
signal(SIGPIPE, SIG_IGN);
#endif
#elif ME_WIN_LIKE
_fmode=_O_BINARY;
#endif
}
在这个函数中,当程序终止,收到SIGTERM信号时,会执行sigHandle函数。
static void sigHandler(int signo)
{
finished = 1;
}
这个函数就是将finished置一。
因此除非程序终止,否则主事件的while循环会一直持续。
if (socketSelect(-1, delay)) {
socketProcess();
}
socketSelect函数负责轮询所有socket,查看是否有可读,可写,或者异常事件发生。
socketProcess()负责创建新的socket,并执行回调函数。
index = sp->sock / (NBBY * sizeof(fd_mask));
bit = ((ssize) 1) << (sp->sock % (NBBY * sizeof(fd_mask)));
/*
Set the appropriate bit in the ready masks for the sp->sock.
*/
if (sp->handlerMask & SOCKET_READABLE) {
readFds[index] |= bit;
}
if (sp->handlerMask & SOCKET_WRITABLE) {
writeFds[index] |= bit;
}
if (sp->handlerMask & SOCKET_EXCEPTION) {
exceptFds[index] |= bit;
}
这一段遍历所有socket,进行这样的处理,是将socket分类,分别存储到读,写,异常三个数组中去,并且根据index值喝bit值,还可以计算出对应的socket值。
关于我一直疑惑的地方,由下面这段解答:
select 函数调用前后会修改 readfds、writefds 和 exceptfds 这三个集合中的内容(如果有的话),所以如果您想下次调用 select 复用这个变量,记得在下次调用前再次调用 select 前先使用 FD_ZERO 将集合清零,然后调用 FD_SET 将需要检测事件的 fd 再次添加进去。
select 函数调用之后,readfds、writefds和exceptfds这三个集合中存放的不是我们之前设置进去的 fd,而是有相关有读写或异常事件的 fd,也就是说 select 函数会修改这三个参数的内容,这也要求我们当一个 fd_set 被 select 函数调用后,这个 fd_set 就已经发生了改变,下次如果我们需要使用它,必须使用 FD_ZERO 宏先清零,再重新将我们关心的 fd 设置进去。
后面当select捕获到任务时,readFds里面就已经是捕获到事件的fd了。
if (readFds[index] & bit) {
sp->currentEvents |= SOCKET_READABLE;
}
if (writeFds[index] & bit) {
sp->currentEvents |= SOCKET_WRITABLE;
}
if (exceptFds[index] & bit) {
sp->currentEvents |= SOCKET_EXCEPTION;
}
将这些事件对应的socket赋值为可读可写。
PUBLIC void socketProcess(void)
{
WebsSocket *sp;
int sid;
for (sid = 0; sid < socketMax; sid++) {
if ((sp = socketList[sid]) != NULL) {
if (sp->currentEvents & sp->handlerMask) {
socketDoEvent(sp);
}
}
}
}
在socketDoEvent中对事件进行处理。
static void socketDoEvent(WebsSocket *sp)
{
int sid;
assert(sp);
sid = sp->sid;
if (sp->currentEvents & SOCKET_READABLE) {
if (sp->flags & SOCKET_LISTENING) {
socketAccept(sp);
sp->currentEvents = 0;
return;
}
}
/*
Now invoke the users socket handler. NOTE: the handler may delete the
socket, so we must be very careful after calling the handler.
*/
if (sp->handler && (sp->handlerMask & sp->currentEvents)) {
(sp->handler)(sid, sp->handlerMask & sp->currentEvents, sp->handler_data);
/*
Make sure socket pointer is still valid, then reset the currentEvents.
*/
if (socketList && sid < socketMax && socketList[sid] == sp) {
sp->currentEvents = 0;
}
}
}
其中sp->flags & SOCKET_LISTENING是指如果存在读事件的套接字是服务器的监听套接字,那就意味着这个读事件是指存在新的连接,而不是网页和服务端的通信数据。
那么存在新连接当然就是把新连接添加到socketList里面去。
如果就是单纯的通信事件,那就调用==(sp->handler)(sid, sp->handlerMask & sp->currentEvents, sp->handler_data);==去执行回调函数。
这里的回调函数就是上面socket任务线里面讲解的回调函数。
int websRunEvents(void)
{
Callback *s;
WebsTime now;
int i, delay, nextEvent;
nextEvent = (MAXINT / 1000);
now = time(0);
for (i = 0; i < callbackMax; i++) {
if ((s = callbacks[i]) != NULL) {
if (s->at <= now) {
callEvent(i);
delay = MAXINT / 1000;
/* Rescan in case event scheduled or modified an event */
i = -1;
} else {
delay = (int) min(s->at - now, MAXINT / 1000);
}
nextEvent = min(delay, nextEvent);
}
}
return nextEvent * 1000;
}
[1]遍历callbacks数组,
[2]判断事件是否到达时间,如果有就执行回调。
[3]这个delay值是指下次循环的时间。最大值为MAXINT/1000,如果有在这时间内需要执行的时间,就将delay赋值为最近执行函数的时间。
[4]最终的返回值nextEvent就是下一次循环的时间。
[5]执行函数回调后,i=-1的作用是,假如回调函数执行时间比较长,可能存在函数执行之前前面元素的函数还没到时间,但执行完后前面的函数到时间了,继续往后遍历会导致前面的事件推迟过久。所以每次执行回调后,都会从头遍历数组。
此处说的delay是指websServiceEvents内的,并非某个子函数内部的。
[1]起始delay=0,socketSelect函数直接返回,delay被赋值为MAXINT,之后进入定时事件的执行,delay值会发生更改,变化为下次执行定时事件需要的时间。
[2]下一次循环就会将delay值作为等待socket事件的最大阻塞时间,这样可以保证及时处理定时任务。
至此其实已经讲的差不多了,也修改了很多之前博客中的错误理解,正常的通信流程和回调过程已经解释的比较清楚。但是关于一些特殊情况的处理,还是没有领悟的很深刻。
比如:
当我一个事件的回调需要执行5秒的事件,而我在5秒内又通过界面发送了多次事件,那这之中的事件是如何如理的呢?我通过实践发现这些事件依旧被正常接收和处理了,但是我还没有想明白是在哪里进行的处理。
之后想明白了再更新吧。
附上我在查资料过程中发现的一篇select讲解,感觉真的是让我茅塞顿开。
https://cloud.tencent.com/developer/news/376119