当前位置: 首页 > 工具软件 > GoAhead > 使用案例 >

goahead源码解析(五)---------整体的任务执行流程

慕容铭
2023-12-01

整个goahead的框架其实包含了两种任务,一种是socket任务,就是处理网页的连接以及通信,另一种是定时器任务,用来定时执行某个函数。下面我就从这两条线大致描述一下goahead的工作逻辑。

定时器任务线

变量定义:

/*
    This structure stores scheduled events.
 */
typedef struct Callback {
    void        (*routine)(void *arg, int id);
    void        *arg;
    WebsTime    at;
    int         id;
} Callback;

static Callback  **callbacks;
static int       callbackMax;

static HashTable **sym;             /* List of symbol tables */
static int       symMax;            /* One past the max symbol table */

初始化:

PUBLIC int websRuntimeOpen(void)
{
    symMax = 0;
    sym = 0;
    srand((uint) time(NULL));
    return 0;
}

创建事件:

/*
    Schedule an event in delay milliseconds time. We will use 1 second granularity for webServer.
 */
PUBLIC int websStartEvent(int delay, WebsEventProc proc, void *arg)
{
    Callback    *s;
    int         id;

    if ((id = wallocObject(&callbacks, &callbackMax, sizeof(Callback))) < 0) {
        return -1;
    }
    s = callbacks[id];
    s->routine = proc;
    s->arg = arg;
    s->id = id;

    /*
        Round the delay up to seconds.
     */
    s->at = ((delay + 500) / 1000) + time(0);
    return id;
}

该函数就是创建并填充结构体,将结构体存储在callbacks数组里面。

其中s->at取值是计算运行时的时间。+500再/1000是为了取整。如果想要循环定时,则需要在回调函数内调用websRestartEvent。

具体事件一

MAIN->websOpen

    if (!websDebug) {
        pruneId = websStartEvent(WEBS_SESSION_PRUNE, (WebsEventProc) pruneSessions, 0);
    }

该事件的作用是,每隔一分钟去清除一次失效的webSessions。

具体事件二

MAIN->websListen->websAccept

wp->timeout = websStartEvent(PARSE_TIMEOUT, checkTimeout, (void*) wp);

该事件的作用是判断当前连接是否已经超时,超时的话就释放掉相关变量。

在进行读事件时使用websNoteRequestActivity来标记一个时间戳。

在checkTimeout中使用getTimeSinceMark来获取标记后至今的时间。

判断经过的时间是否超过了最大时间限制(60s),

假如超过了时间限制,就根据网页状态,判断是请求超时还是已经断开连接。

没有超过时间限制,就计算一下进行下一次检测的时间(60s - 经过的时间)。

socket任务线

变量定义:

PUBLIC WebsSocket   **socketList;           /* List of open sockets */
PUBLIC int          socketMax;              /* Maximum size of socket */
PUBLIC Socket       socketHighestFd = -1;   /* Highest socket fd opened */
PUBLIC int          socketOpenCount = 0;    /* Number of task using sockets */

初始化:

PUBLIC int socketOpen(void)
{
    Socket  fd;

    if (++socketOpenCount > 1) {
        return 0;
    }
    socketList = NULL;
    socketMax = 0;
    socketHighestFd = -1;
    if ((fd = socket(AF_INET6, SOCK_STREAM, 0)) != -1) {
        hasIPv6 = 1;
        closesocket(fd);
    } else {
        trace(1, "This system does not have IPv6 support");
    }
    return 0;
}

创建事件:

/*
    Allocate a new socket structure
 */
PUBLIC int socketAlloc(cchar *ip, int port, SocketAccept accept, int flags)
{
    WebsSocket  *sp;
    int         sid;

    if (socketMax >= FD_SETSIZE) {
        return -1;
    }
    if ((sid = wallocObject(&socketList, &socketMax, sizeof(WebsSocket))) < 0) {
        return -1;
    }
    sp = socketList[sid];
    sp->sid = sid;
    sp->accept = accept;
    sp->port = port;
    sp->fileHandle = -1;
    sp->saveMask = -1;
    if (ip) {
        sp->ip = sclone(ip);
    }
    sp->flags = flags & (SOCKET_BLOCK | SOCKET_LISTENING | SOCKET_NODELAY);
    return sid;
}

跟上面的定时器任务的创建事件是一样的,都是给结构体赋值,指定WebSocket的回调函数,并将结构体存储在socketList数组内。

回调函数:

与上面定时器事件不同的是,所有的socket事件回调函数都是同一个。该回调函数就是

/*
    Accept a new connection from ipaddr:port
 */
PUBLIC int websAccept(int sid, cchar *ipaddr, int port, int listenSid)
{
    Webs        *wp;
    WebsSocket  *lp;
    struct sockaddr_storage ifAddr;
    int         wid, len;

    assert(sid >= 0);
    assert(ipaddr && *ipaddr);
    assert(listenSid >= 0);
    assert(port >= 0);

    /*
        Allocate a new handle for this accepted connection. This will allocate a Webs structure in the webs[] list
     */
    if ((wid = websAlloc(sid)) < 0) {
        return -1;
    }
    wp = webs[wid];
    assert(wp);
    wp->listenSid = listenSid;
    strncpy(wp->ipaddr, ipaddr, min(sizeof(wp->ipaddr) - 1, strlen(ipaddr)));

    /*
        Get the ip address of the interface that accept the connection.
     */
    len = sizeof(ifAddr);
    if (getsockname(socketPtr(sid)->sock, (struct sockaddr*) &ifAddr, (Socklen*) &len) < 0) {
        error("Cannot get sockname");
        websFree(wp);
        return -1;
    }
    socketAddress((struct sockaddr*) &ifAddr, (int) len, wp->ifaddr, sizeof(wp->ifaddr), NULL);

#if ME_GOAHEAD_LEGACY
    /*
        Check if this is a request from a browser on this system. This is useful to know for permitting administrative
        operations only for local access
     */
    if (strcmp(wp->ipaddr, "127.0.0.1") == 0 || strcmp(wp->ipaddr, websIpAddr) == 0 ||
            strcmp(wp->ipaddr, websHost) == 0) {
        wp->flags |= WEBS_LOCAL;
    }
#endif

    /*
        Arrange for socketEvent to be called when read data is available
     */
    lp = socketPtr(listenSid);
    trace(4, "New connection from %s:%d to %s:%d", ipaddr, port, wp->ifaddr, lp->port);

#if ME_COM_SSL
    if (lp->secure) {
        wp->flags |= WEBS_SECURE;
        trace(4, "Upgrade connection to TLS");
        if (sslUpgrade(wp) < 0) {
            error("Cannot upgrade to TLS");
            websFree(wp);
            return -1;
        }
    }
#endif
    assert(wp->timeout == -1);
    wp->timeout = websStartEvent(PARSE_TIMEOUT, checkTimeout, (void*) wp);
    socketEvent(sid, SOCKET_READABLE, wp);
    return 0;
}

变量定义:

static Webs         **webs;                     /* Open connection list head */
static int          websMax;                    /* List size */

函数解析:

该回调函数的流程是:

(1)创建一个Webs结构体,然后填入相关参数。

(2)创建一个定时器事件,定时检测该连接是否超时。(定时器事件的实现上面已经讲过了。)

(3)调用子函数socketEvent执行事件。

子函数socketEvent

/*
    The webs socket handler. Called in response to I/O. We just pass control to the relevant read or write handler. A
    pointer to the webs structure is passed as a (void*) in wptr.
 */
static void socketEvent(int sid, int mask, void *wptr)
{
    Webs    *wp;

    wp = (Webs*) wptr;
    assert(wp);

    assert(websValid(wp));
    if (! websValid(wp)) {
        return;
    }
    if (mask & SOCKET_READABLE) {
        readEvent(wp);
    }
    if (mask & SOCKET_WRITABLE) {
        writeEvent(wp);
    }
    if (wp->flags & WEBS_CLOSED) {
        websFree(wp);
        /* WARNING: wp not valid here */
    }
}

这个函数其实也就是个套子,真正的执行在readEvent中

子函数readEvent

/*
    The webs read handler. This is the primary read event loop. It uses a state machine to track progress while parsing
    the HTTP request.  Note: we never block as the socket is always in non-blocking mode.
 */
static void readEvent(Webs *wp)
{
    WebsBuf     *rxbuf;
    WebsSocket  *sp;
    ssize       nbytes;

    assert(wp);
    assert(websValid(wp));

    if (!websValid(wp)) {
        return;
    }
    websNoteRequestActivity(wp);   //创建一个时间戳,后面定时器任务会用到。
    rxbuf = &wp->rxbuf;

    if (bufRoom(rxbuf) < (ME_GOAHEAD_LIMIT_BUFFER + 1)) {   //判断可用空间
        if (!bufGrow(rxbuf, ME_GOAHEAD_LIMIT_BUFFER + 1)) {   //扩大空间
            websError(wp, HTTP_CODE_INTERNAL_SERVER_ERROR, "Cannot grow rxbuf");
            websPump(wp);
            return;
        }
    }
    //读取数据
    if ((nbytes = websRead(wp, (char*) rxbuf->endp, ME_GOAHEAD_LIMIT_BUFFER)) > 0) {
        wp->lastRead = nbytes;
        bufAdjustEnd(rxbuf, nbytes);  //调整buf内指针的位置
        bufAddNull(rxbuf);    //buf末加'\0'
    }
    if (nbytes > 0 || wp->state > WEBS_BEGIN) {
        websPump(wp);
    }
    if (wp->flags & WEBS_CLOSED) {
        return;
    } else if (nbytes < 0 && socketEof(wp->sid)) {
        /* EOF or error. Allow running requests to continue. */
        if (wp->state < WEBS_READY) {
            if (wp->state >= WEBS_BEGIN) {
                websError(wp, HTTP_CODE_COMMS_ERROR | WEBS_CLOSE, "Read error: connection lost");
                websPump(wp);
            } else {
                complete(wp, 0);
            }
        } else {
            socketDeleteHandler(wp->sid);
        }
    } else if (wp->state < WEBS_READY) {
        sp = socketPtr(wp->sid);
        socketCreateHandler(wp->sid, sp->handlerMask | SOCKET_READABLE, socketEvent, wp);
    }
}

子函数bufAdjustEnd

/*
    Adjust the endp pointer after the user has copied data into the queue.
 */
PUBLIC void bufAdjustEnd(WebsBuf *bp, ssize size)
{
	//endp是数据的结尾的指针,endbuf是buf结尾的指针。
    assert(bp);
    //确定buf的长度
    assert(bp->buflen == (bp->endbuf - bp->buf));
    //新读入的数据大小必须小于buf的长度
    assert(0 <= size && size < bp->buflen);
    
	//新读入了size字节的数据,所以要将endp移动。
    bp->endp += size;
    //假如移动后超出了buf本身的范围,就将endp赋值为超出一个buflen的偏移。(这样数据不会出问题吗?)
    if (bp->endp >= bp->endbuf) {
        bp->endp -= bp->buflen;
    }
    /*
        Flush the queue if the endp pointer is corrupted via a bad size
     */
     //左移后依然超出范围,
     //也就是endp+size-buflen>=endbuf,因为size<buflen,
     //也就是说移动之前的endp>endbuf,
     //那就说明endp出问题了,直接把buf清空。
    if (bp->endp >= bp->endbuf) {
        error("Bad end pointer");
        bufFlush(bp);
    }
}

主循环

函数主体:

PUBLIC void websServiceEvents(int *finished)
{
    int     delay, nextEvent;

    if (finished) {
        *finished = 0;
    }
    delay = 0;
    while (!finished || !*finished) {
        if (socketSelect(-1, delay)) {
            socketProcess();
        }
#if ME_GOAHEAD_CGI
        delay = websCgiPoll();
#else
        delay = MAXINT;
#endif
        nextEvent = websRunEvents();
        delay = min(delay, nextEvent);
    }
}

(1)关于finished

static void initPlatform(void)
{
#if ME_UNIX_LIKE
    signal(SIGTERM, sigHandler);
    #ifdef SIGPIPE
        signal(SIGPIPE, SIG_IGN);
    #endif
#elif ME_WIN_LIKE
    _fmode=_O_BINARY;
#endif
}

在这个函数中,当程序终止,收到SIGTERM信号时,会执行sigHandle函数。

static void sigHandler(int signo)
{
    finished = 1;
}

这个函数就是将finished置一。

因此除非程序终止,否则主事件的while循环会一直持续。

(2)socket任务的执行

        if (socketSelect(-1, delay)) {
            socketProcess();
        }

socketSelect函数负责轮询所有socket,查看是否有可读,可写,或者异常事件发生。

socketProcess()负责创建新的socket,并执行回调函数。

socketSelect

 		index = sp->sock / (NBBY * sizeof(fd_mask));
        bit = ((ssize) 1) << (sp->sock % (NBBY * sizeof(fd_mask)));
        /*
            Set the appropriate bit in the ready masks for the sp->sock.
         */
        if (sp->handlerMask & SOCKET_READABLE) {
            readFds[index] |= bit;
        }
        if (sp->handlerMask & SOCKET_WRITABLE) {
            writeFds[index] |= bit;
        }
        if (sp->handlerMask & SOCKET_EXCEPTION) {
            exceptFds[index] |= bit;
        }

这一段遍历所有socket,进行这样的处理,是将socket分类,分别存储到读,写,异常三个数组中去,并且根据index值喝bit值,还可以计算出对应的socket值。

关于我一直疑惑的地方,由下面这段解答:

select 函数调用前后会修改 readfds、writefds 和 exceptfds 这三个集合中的内容(如果有的话),所以如果您想下次调用 select 复用这个变量,记得在下次调用前再次调用 select 前先使用 FD_ZERO 将集合清零,然后调用 FD_SET 将需要检测事件的 fd 再次添加进去

select 函数调用之后,readfdswritefdsexceptfds这三个集合中存放的不是我们之前设置进去的 fd,而是有相关有读写或异常事件的 fd,也就是说 select 函数会修改这三个参数的内容,这也要求我们当一个 fd_set 被 select 函数调用后,这个 fd_set 就已经发生了改变,下次如果我们需要使用它,必须使用 FD_ZERO 宏先清零,再重新将我们关心的 fd 设置进去

后面当select捕获到任务时,readFds里面就已经是捕获到事件的fd了。

        if (readFds[index] & bit) {
            sp->currentEvents |= SOCKET_READABLE;
        }
        if (writeFds[index] & bit) {
            sp->currentEvents |= SOCKET_WRITABLE;
        }
        if (exceptFds[index] & bit) {
            sp->currentEvents |= SOCKET_EXCEPTION;
        }

将这些事件对应的socket赋值为可读可写。

PUBLIC void socketProcess(void)
{
    WebsSocket  *sp;
    int         sid;

    for (sid = 0; sid < socketMax; sid++) {
        if ((sp = socketList[sid]) != NULL) {
            if (sp->currentEvents & sp->handlerMask) {
                socketDoEvent(sp);
            }
        }
    }
}

在socketDoEvent中对事件进行处理。

static void socketDoEvent(WebsSocket *sp)
{
    int     sid;

    assert(sp);

    sid = sp->sid;
    if (sp->currentEvents & SOCKET_READABLE) {
        if (sp->flags & SOCKET_LISTENING) {
            socketAccept(sp);
            sp->currentEvents = 0;
            return;
        }
    }
    /*
        Now invoke the users socket handler. NOTE: the handler may delete the
        socket, so we must be very careful after calling the handler.
     */
    if (sp->handler && (sp->handlerMask & sp->currentEvents)) {
        (sp->handler)(sid, sp->handlerMask & sp->currentEvents, sp->handler_data);
        /*
            Make sure socket pointer is still valid, then reset the currentEvents.
         */
        if (socketList && sid < socketMax && socketList[sid] == sp) {
            sp->currentEvents = 0;
        }
    }
}

其中sp->flags & SOCKET_LISTENING是指如果存在读事件的套接字是服务器的监听套接字,那就意味着这个读事件是指存在新的连接,而不是网页和服务端的通信数据。

那么存在新连接当然就是把新连接添加到socketList里面去。

如果就是单纯的通信事件,那就调用==(sp->handler)(sid, sp->handlerMask & sp->currentEvents, sp->handler_data);==去执行回调函数。

这里的回调函数就是上面socket任务线里面讲解的回调函数。

(3)定时器任务的执行

int websRunEvents(void)
{
    Callback    *s;
    WebsTime    now;
    int         i, delay, nextEvent;

    nextEvent = (MAXINT / 1000);
    now = time(0);

    for (i = 0; i < callbackMax; i++) {
        if ((s = callbacks[i]) != NULL) {
            if (s->at <= now) {
                callEvent(i);
                delay = MAXINT / 1000;
                /* Rescan in case event scheduled or modified an event */
                i = -1;
            } else {
                delay = (int) min(s->at - now, MAXINT / 1000);
            }
            nextEvent = min(delay, nextEvent);
        }
    }
    return nextEvent * 1000;
}

[1]遍历callbacks数组,

[2]判断事件是否到达时间,如果有就执行回调。

[3]这个delay值是指下次循环的时间。最大值为MAXINT/1000,如果有在这时间内需要执行的时间,就将delay赋值为最近执行函数的时间。

[4]最终的返回值nextEvent就是下一次循环的时间。

[5]执行函数回调后,i=-1的作用是,假如回调函数执行时间比较长,可能存在函数执行之前前面元素的函数还没到时间,但执行完后前面的函数到时间了,继续往后遍历会导致前面的事件推迟过久。所以每次执行回调后,都会从头遍历数组。

(4)关于delay

此处说的delay是指websServiceEvents内的,并非某个子函数内部的。

[1]起始delay=0,socketSelect函数直接返回,delay被赋值为MAXINT,之后进入定时事件的执行,delay值会发生更改,变化为下次执行定时事件需要的时间。

[2]下一次循环就会将delay值作为等待socket事件的最大阻塞时间,这样可以保证及时处理定时任务。

总结

至此其实已经讲的差不多了,也修改了很多之前博客中的错误理解,正常的通信流程和回调过程已经解释的比较清楚。但是关于一些特殊情况的处理,还是没有领悟的很深刻。
比如:
当我一个事件的回调需要执行5秒的事件,而我在5秒内又通过界面发送了多次事件,那这之中的事件是如何如理的呢?我通过实践发现这些事件依旧被正常接收和处理了,但是我还没有想明白是在哪里进行的处理。
之后想明白了再更新吧。
附上我在查资料过程中发现的一篇select讲解,感觉真的是让我茅塞顿开。
https://cloud.tencent.com/developer/news/376119

 类似资料: