Web服务器---TinyWebServer代码详细讲解(threadpool模块)

慎兴业
2023-12-01

这里的参照的代码是https://github.com/qinguoyi/TinyWebServer

对于原代码的不足之处,我会在之后的文章中给出改进代码 在笔者fork的这版中,原代码作者对于代码作出了更细化的分类
细节问题可以参考《APUE》《Linux高性能服务器编程》或者我之前的博客

threadpool.h

设计思路

一个web服务器几乎离不开多线程了,在main那里我们说到main把所有读时间能读到的数据都存放在客户读缓冲区中,然后就插入任务到任务队列等待线程去完成。

线程池类有两个最为关键的成员:

  • pthread_t* m_threads; //线程池数组
  • std::list<T*> m_workqueue; //请求队列

我们在WebServer中调用append_p和append函数(区别是lfd和cfd,因为这里有模式问题)把新任务放到list的尾部,然后所有线程争夺list中的任务(这里要使用条件变量),争夺到的线程先对任务队列加互斥锁然后从list头取出任务结构体,在这个任务结构体内有一个回调函数,这个函数就是真正的工作(包括解析http请求报文,对请求资源的检查,搓响应报文一条龙)当然这个函数我们放在http模块以更加模块化,从线程的角度就是我们拿到任务然后调用这个函数,线程就是在漫长的这个函数中度过了。

然后是线程池数组,这个线程池还是比较简单的线程池(没有对线程的动态删减等等),那么我们就是在线程池构造的时候就创建好约定个数的线程储存在线程数组里,并且把线程detach掉,这样我们就不需要对线程进行回收等等操作。线程的关键是线程的工作函数run(),这个函数不断while循环直到被条件变量唤醒然后上锁从list头取出任务,开始执行任务。

代码详解

你可以当做这是一个标准的线程池写法。
在之前的WebServer中,我们对线程池函数没有讲解。在这里我们讲一下为什么这样写。

void WebServer::thread_pool()
{
    //线程池
    m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);
}
  1. 构造函数完成线程池的初始化,产生了8个线程并且作出了分离
  2. m_poll这个指针可以调用append函数来完成将事件入队的任务
  3. http_conn给到了一个入口的作用,通过等待信号量来进入到http类里写出的process函数完成对于事务的处理。
    意味着以上三步走之后,半同步/半反应堆的模式达成。左侧有嗷嗷待哺的线程,右侧有反应堆。

原作者的代码中没有涉及惊群效应

preactor与reactor

同样,这里在run函数中给出了两种模式的不同处理,在preactor模式下,同步线程处理两种任务逻辑。
这里说一下为什么是两种,首先信号任务由主线程完成,其次如果wait监听到lfd,是不需要入队的,直接通知主线程调用accept就行了。如果你对此有疑问,再次阅读event_loop函数

统一信号源不代表我统一处理方法!

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"

template <typename T>
class threadpool
{
public:
    /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
    //connPool是数据库连接池指针
    threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
    ~threadpool();
    bool append(T *request, int state);
    bool append_p(T *request);

private:
    /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
    static void *worker(void *arg);
    void run();

private:
    int m_thread_number;        //线程池中的线程数
    int m_max_requests;         //请求队列中允许的最大请求数 10000
    pthread_t *m_threads;       //描述线程池的数组,其大小为m_thread_number
    std::list<T *> m_workqueue; //请求队列
    locker m_queuelocker;       //保护请求队列的互斥锁
    sem m_queuestat;            //是否有任务需要处理
    connection_pool *m_connPool;  //数据库连接池
    int m_actor_model;          //模型切换
};
template <typename T> //线程池构造函数
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{
    if (thread_number <= 0 || max_requests <= 0)
        throw std::exception();
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i < thread_number; ++i)
    {
        //循环创建线程,并将工作线程按要求进行运行
        if (pthread_create(m_threads + i, NULL, worker, this) != 0) //这个this会传给 worker的arg
        {
            delete[] m_threads;
            throw std::exception();
        }
        //将线程进行分离后,不用单独对工作线程进行回收
        if (pthread_detach(m_threads[i]))
        {
            delete[] m_threads;
            throw std::exception();
        }
    }
}

template <typename T>
threadpool<T>::~threadpool()
{
    delete[] m_threads;
}

template <typename T>
bool threadpool<T>::append(T *request, int state)
{
    m_queuelocker.lock();
    if (m_workqueue.size() >= m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    request->m_state = state;
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}
//将“待办工作”加入到请求队列
//传入的是fd
template <typename T>
bool threadpool<T>::append_p(T *request)
{
    m_queuelocker.lock();
    if (m_workqueue.size() >= m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}
//线程回调函数/工作函数,arg其实是this
template <typename T>
void *threadpool<T>::worker(void *arg)
{
    //将参数强转为线程池类,调用成员方法
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}
//回调函数会调用这个函数工作
//工作线程就是不断地等任务队列有新任务,然后就加锁取任务->取到任务解锁->执行任务
template <typename T>
void threadpool<T>::run()
{
    while (true)
    {
        //信号量等待
        m_queuestat.wait();
        //被唤醒后先加互斥锁
        m_queuelocker.lock();
        if (m_workqueue.empty())
        {
            m_queuelocker.unlock();
            continue;
        }

        //从请求队列中取出第一个任务
        //将任务从请求队列删除
        T *request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();
        if (!request)
            continue;
        //preactor reactor
        if (1 == m_actor_model)
        {
            if (0 == request->m_state)
            {
                if (request->read_once())
                {
                    request->improv = 1;
                    connectionRAII mysqlcon(&request->mysql, m_connPool);
                    request->process();
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
            else
            {
                if (request->write())
                {
                    request->improv = 1;
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
        }
        else
        {
            connectionRAII mysqlcon(&request->mysql, m_connPool);
            request->process();
        }
    }
}
#endif

总结

程序采用c++编写,要自己封装一个简易的线程池类。大致思路是创建固定数目的线程(如跟核数相同),然后类内部维护一个生产者—消费者队列。

提供相应的添加任务(生产者)和执行任务接口(消费者)。按照操作系统书中典型的生产者—消费者模型维护增减队列任务(使用mutex和semaphore)。

mutex用于互斥,保证任意时刻只有一个线程读写队列,semaphore用于同步,保证执行顺序(队列为空时不要读,队列满了不要写)。

 类似资料: