当前位置: 首页 > 工具软件 > Libevhtp > 使用案例 >

libevhtp一个基于libevent编写的开源的http服务端网络模块学习

滕项明
2023-12-01

最近学习libevhtp的源码,网络的框架大概也是基于libevent,主线程中accept新连接,创建多个子线程,每个线程拥有自己的eventbase,有新连接来的时候,通过判断缓存中正在处理的最小连接数,利用管道通知,将连接分配到子线程中,建立event到子线程的base上

 

bool HttpMainThread::Init()
{
    evbase = event_base_new();
    if (!evbase)
    {
        LOG_ERROR("http main thread event_base_new failed.");
        return false;
    }

    timer_evt = event_new(evbase, -1, EV_PERSIST, OnSessionCheck, NULL); // eventnew ,设置回调
    {
        struct timeval tv;
        tv.tv_sec = Config::Http().CheckInterval();
        tv.tv_usec = 0;
        if (!timer_evt || evtimer_add(timer_evt, &tv) < 0)
        {
            LOG_ERROR("http worker thread init failed. timer init failed.");
            return false;
        }
    }

    http = evhtp_new(evbase, NULL);
    if (!http)
    {
        LOG_ERROR("http main thread evhtp_new failed.");
        return false;
    }
    //注册url回调接口
    http_dispatcher.RegisterInterfaces(http);

    //init worker threads //evhtp 内部线程数 //内部每个线程会event_base_new event_new,每个线程负责处理客户端
    evhtp_use_threads_wexit(http, OnWorkerThreadInit, OnWorkerThreadExit, Config::Http().NumWorkerThreads(), NULL);
    //新连接callback before accept a new connect
    evhtp_set_pre_accept_cb(http, OnNewConn, NULL);

    //listen
    const char* listen_ip = Config::Http().ListenIP().c_str();
    uint16 listen_port = Config::Http().ListenPort();
    if (0 != evhtp_bind_socket(http, listen_ip, listen_port, 1024))//在主线程中 socket ,bind, listen, accept
    {
        LOG_ERROR("http main thread bind socker failed. on %s:%d: %s", listen_ip, listen_port, strerror(errno));
        return false;
    }
    LOG_NOTICE("http main thread is listening on %s:%d", listen_ip, listen_port);
    return true;
}

 

首先主线程通过event_base_new创建一个main evbase,注册url回调接口

主要是这个函数evhtp_use_threads_wexit,主要作用是创建多个线程,用来处理连接,将新连接加到多个线程中

int
evhtp_use_threads_wexit(evhtp_t * htp,
                        evhtp_thread_init_cb init_cb,
                        evhtp_thread_exit_cb exit_cb,
                        int nthreads, void * arg)
{
    return htp__use_threads_(htp, init_cb, exit_cb, nthreads, arg);
}

调用的是htp__use_threads_

//创建多个线程
static int
htp__use_threads_(evhtp_t * htp,
                  evhtp_thread_init_cb init_cb,
                  evhtp_thread_exit_cb exit_cb,
                  int nthreads, void * arg)
{
    if (htp == NULL) {
        return -1;
    }

    htp->thread_cbarg   = arg;
    htp->thread_init_cb = init_cb;
    htp->thread_exit_cb = exit_cb;

#ifndef EVHTP_DISABLE_SSL
    evhtp_ssl_use_threads();
#endif

    if (!(htp->thr_pool = evthr_pool_wexit_new(nthreads,
              htp__thread_init_,
              htp__thread_exit_, htp))) {
        return -1;
    }

    evthr_pool_start(htp->thr_pool);//pthread_create

    return 0;
}

evthr_pool_t *
evthr_pool_wexit_new(int nthreads,
                     evthr_init_cb init_cb,
                     evthr_exit_cb exit_cb, void * shared)
{
    return _evthr_pool_new(nthreads, init_cb, exit_cb, shared);
}

在这函数里面,创建多个线程,设置回调函数

static evthr_pool_t *
_evthr_pool_new(int           nthreads,
                evthr_init_cb init_cb,
                evthr_exit_cb exit_cb,
                void        * shared)
{
    evthr_pool_t * pool;
    int            i;

#ifdef EVTHR_SHARED_PIPE
    int            fds[2];
#endif

    if (nthreads == 0) {
        return NULL;
    }

    if (!(pool = calloc(sizeof(evthr_pool_t), 1))) {
        return NULL;
    }

    pool->nthreads = nthreads;
    TAILQ_INIT(&pool->threads);

#ifdef EVTHR_SHARED_PIPE
    if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
        return NULL;
    }

    evutil_make_socket_nonblocking(fds[0]);
    evutil_make_socket_nonblocking(fds[1]);

    pool->rdr = fds[0];
    pool->wdr = fds[1];//建立pool与线程之间的管道
#endif

    for (i = 0; i < nthreads; i++) {
        evthr_t * thread;

        if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {//pthread_mutex_init  ,set cb
            evthr_pool_free(pool);
            return NULL;
        }

#ifdef EVTHR_SHARED_PIPE
        thread->pool_rdr = fds[0];
#endif

        TAILQ_INSERT_TAIL(&pool->threads, thread, next);
    }

    return pool;
} /* _evthr_pool_new */

 

evhtp_bind_socket函数,在主线程中,建立socket,bind,listen,accept,并将连接分配到各个子线程中

intsocket ,bind, listen, accept
evhtp_bind_socket(evhtp_t * htp, const char * baddr, uint16_t port, int backlog)
{
#ifndef NO_SYS_UN
    struct sockaddr_un  sockun = { 0 };
#endif
    struct sockaddr   * sa;
    struct sockaddr_in6 sin6   = { 0 };
    struct sockaddr_in  sin    = { 0 };
    size_t              sin_len;

    if (!strncmp(baddr, "ipv6:", 5)) {
        baddr           += 5;
        sin_len          = sizeof(struct sockaddr_in6);
        sin6.sin6_port   = htons(port);
        sin6.sin6_family = AF_INET6;

        evutil_inet_pton(AF_INET6, baddr, &sin6.sin6_addr);
        sa = (struct sockaddr *)&sin6;
    } else if (!strncmp(baddr, "unix:", 5)) {
#ifndef NO_SYS_UN
        baddr += 5;

        if (strlen(baddr) >= sizeof(sockun.sun_path)) {
            return -1;
        }

        sin_len           = sizeof(struct sockaddr_un);
        sockun.sun_family = AF_UNIX;

        strncpy(sockun.sun_path, baddr, strlen(baddr));

        sa = (struct sockaddr *)&sockun;
#else

        return -1;
#endif
    } else {
        if (!strncmp(baddr, "ipv4:", 5)) {
            baddr += 5;
        }

        sin_len             = sizeof(struct sockaddr_in);
        sin.sin_family      = AF_INET;
        sin.sin_port        = htons(port);
        sin.sin_addr.s_addr = inet_addr(baddr);

        sa = (struct sockaddr *)&sin;
    }

    return evhtp_bind_sockaddr(htp, sa, sin_len, backlog);
}         /* evhtp_bind_socket */

 


int
evhtp_bind_sockaddr(evhtp_t         * htp,
                    struct sockaddr * sa,
                    size_t            sin_len,
                    int               backlog)
{
    evutil_socket_t fd    = -1;
    int             on    = 1;
    int             error = 1;

    if (htp == NULL) {
        log_error("NULL param passed");
        return -1;
    }

    /* XXX: API's should not set signals */
#ifndef WIN32
    signal(SIGPIPE, SIG_IGN);
#endif

    do {
        if ((fd = socket(sa->sa_family, SOCK_STREAM, 0)) == -1) {//socket
            log_error("couldn't create socket");
            return -1;
        }

        evutil_make_socket_closeonexec(fd);
        evutil_make_socket_nonblocking(fd);

        if (htp__serv_setsockopts_(htp, fd) == -1) {//设置一些socker属性
            break;
        }

        if (sa->sa_family == AF_INET6) {
            if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) == -1) {
                break;
            }
        }

        if (bind(fd, sa, sin_len) == -1) {//bind
            break;
        }

        error = 0;
    } while (0);


    if (error == 1) {
        if (fd != -1) {
            evutil_closesocket(fd);
        }

        return -1;
    }

    if (evhtp_accept_socket(htp, fd, backlog) == -1) {//listen  accept
        /* accept_socket() does not close the descriptor
         * on error, but this function does.
         */
        evutil_closesocket(fd);

        return -1;
    }

    return 0;
}         /* evhtp_bind_sockaddr */

int
evhtp_accept_socket(evhtp_t * htp, evutil_socket_t sock, int backlog)
{
    int err = 1;

    if (htp == NULL || sock == -1) {
        log_error("htp = %p && sock = %d", htp, sock);
        return -1;
    }

    do {
        htp->server = evconnlistener_new(htp->evbase,
            htp__accept_cb_,
            htp,
            LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
            backlog,
            sock);

        if (htp->server == NULL) {
            break;
        }

#ifndef EVHTP_DISABLE_SSL
        if (htp->ssl_ctx != NULL) {
            /* if ssl is enabled and we have virtual hosts, set our servername
             * callback. We do this here because we want to make sure that this gets
             * set after all potential virtualhosts have been set, not just after
             * ssl_init.
             */
            if (TAILQ_FIRST(&htp->vhosts) != NULL) {
                SSL_CTX_set_tlsext_servername_callback(htp->ssl_ctx,
                    htp__ssl_servername_);
            }
        }

#endif
        err = 0;
    } while (0);

    if (err == 1) {
        if (htp->server != NULL) {
            evhtp_safe_free(htp->server, evconnlistener_free);
        }

        return -1;
    }

    return 0;
}         /* evhtp_accept_socket */
 

accept在main thread中接收,主要看htp__accept_cb_的回调函数,收到新连接后,对连接进行处理分配到子线程中
static void
htp__accept_cb_(struct evconnlistener * serv, int fd, struct sockaddr * s, int sl, void * arg)
{
    evhtp_t            * htp = arg;
    evhtp_connection_t * connection;

    evhtp_assert(htp && serv && serv && s);

    connection = htp__connection_new_(htp, fd, evhtp_type_server);

    if (evhtp_unlikely(connection == NULL)) {
        return;
    }

    log_debug("fd = %d, conn = %p", fd, connection);

    connection->saddr = htp__malloc_(sl);

    if (evhtp_unlikely(connection->saddr == NULL)) {
        /* should probably start doing error callbacks */
        evhtp_safe_free(connection, evhtp_connection_free);
        return;
    }


    memcpy(connection->saddr, s, sl);

#ifndef EVHTP_DISABLE_EVTHR
    if (htp->thr_pool != NULL) {
        if (evthr_pool_defer(htp->thr_pool,//?这个在干嘛? 获取一个线程接收新连接
                htp__run_in_thread_, connection) != EVTHR_RES_OK) {
            evutil_closesocket(connection->sock);
            evhtp_safe_free(connection, evhtp_connection_free);

            return;
        }

        return;
    }

#endif
    connection->evbase = htp->evbase;

    if (htp__connection_accept_(htp->evbase, connection) == -1) {
        evhtp_safe_free(connection, evhtp_connection_free);
        return;
    }

    if (htp__run_post_accept_(htp, connection) == -1) {
        evhtp_safe_free(connection, evhtp_connection_free);
        return;
    }
}     /* htp__accept_cb_ */

 

通过创建的线程池,获取分配的线程
evthr_res
evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
{
#ifdef EVTHR_SHARED_PIPE
    evthr_cmd_t cmd = {
        .cb   = cb,
        .args = arg,
        .stop = 0
    };

    if (evhtp_unlikely(send(pool->wdr, &cmd, sizeof(cmd), 0) == -1)) {
        return EVTHR_RES_RETRY;
    }

    return EVTHR_RES_OK;
#endif
    evthr_t * thread      = NULL;
    evthr_t * min_thread  = NULL;
    int       min_backlog = 0;

    if (pool == NULL) {
        return EVTHR_RES_FATAL;
    }

    if (cb == NULL) {
        return EVTHR_RES_NOCB;
    }


    TAILQ_FOREACH(thread, &pool->threads, next) {//遍历线程列表
        int backlog = get_backlog_(thread);//

        if (backlog == 0) {
            min_thread = thread;
            break;
        }

        if (min_thread == NULL || backlog < min_backlog) {
            min_thread  = thread;
            min_backlog = backlog;
        }
    }

    return evthr_defer(min_thread, cb, arg);//往线程管道发个信号通知
} /* evthr_pool_defer */

这里用到的线程分配策略,通过读取缓冲区中的最小连接数,来分配连接到线程中

static inline int
get_backlog_(evthr_t * thread)
{
    int backlog = 0;

    ioctl(thread->rdr, FIONREAD, &backlog);//读取缓冲区字节数?

    return (int)(backlog / sizeof(evthr_cmd_t));//字节数/cmd ,等到缓冲区等待的连接的个数
}

这里是往线程的管道发通知

evthr_res
evthr_defer(evthr_t * thread, evthr_cb cb, void * arg)
{
    evthr_cmd_t cmd = {
        .cb   = cb,
        .args = arg,
        .stop = 0
    };

    if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {//往管道发通知
        return EVTHR_RES_RETRY;
    }

    return EVTHR_RES_OK;
}

 

这个回调中,对连接建立event,添加回调等
static int
htp__connection_accept_(struct event_base * evbase, evhtp_connection_t * connection)
{
    struct timeval * c_recv_timeo;
    struct timeval * c_send_timeo;

    if (htp__run_pre_accept_(connection->htp, connection) < 0) {
        evutil_closesocket(connection->sock);

        return -1;
    }

#ifndef EVHTP_DISABLE_SSL
    if (connection->htp->ssl_ctx != NULL) {
        connection->ssl = SSL_new(connection->htp->ssl_ctx);
        connection->bev = bufferevent_openssl_socket_new(evbase,
            connection->sock,
            connection->ssl,
            BUFFEREVENT_SSL_ACCEPTING,
            connection->htp->bev_flags);
        SSL_set_app_data(connection->ssl, connection);
        goto end;
    }

#endif

    connection->bev = bufferevent_socket_new(evbase,//新连接,
        connection->sock,
        connection->htp->bev_flags);

    log_debug("enter sock=%d\n", connection->sock);

#ifndef EVHTP_DISABLE_SSL
end:
#endif

    if (connection->recv_timeo.tv_sec || connection->recv_timeo.tv_usec) {
        c_recv_timeo = &connection->recv_timeo;
    } else if (connection->htp->recv_timeo.tv_sec ||
               connection->htp->recv_timeo.tv_usec) {
        c_recv_timeo = &connection->htp->recv_timeo;
    } else {
        c_recv_timeo = NULL;
    }

    if (connection->send_timeo.tv_sec || connection->send_timeo.tv_usec) {
        c_send_timeo = &connection->send_timeo;
    } else if (connection->htp->send_timeo.tv_sec ||
               connection->htp->send_timeo.tv_usec) {
        c_send_timeo = &connection->htp->send_timeo;
    } else {
        c_send_timeo = NULL;
    }

    evhtp_connection_set_timeouts(connection, c_recv_timeo, c_send_timeo);

    connection->resume_ev = event_new(evbase, -1, EV_READ | EV_PERSIST,
        htp__connection_resumecb_, connection);//对新连接建立event
    event_add(connection->resume_ev, NULL);

    bufferevent_setcb(connection->bev,
        htp__connection_readcb_,
        htp__connection_writecb_,
        htp__connection_eventcb_, connection);

    bufferevent_enable(connection->bev, EV_READ);

    return 0;
}     /* htp__connection_accept_ */

 类似资料: