TinyWebServer4--threadpool

汪迪
2023-12-01

概述

dealwithread和dealwithwrite两个函数作用是插入任务到任务队列等待线程去完成

线程池中两个最为关键的成员:

    pthread_t *m_threads;       //描述线程池的数组,其大小为m_thread_number
    std::list<T *> m_workqueue; //请求队列

1、在WebServer中调用append_p和append函数(区别是lfd和cfd)把新任务加入到list的尾部,然后所有线程争夺list中的任务(这里使用条件变量)。

2、争夺到的线程先对任务队列加互斥锁然后从list头取出任务结构体,在这个任务结构体内有一个回调函数,这个函数就是真正的工作(包括解析http请求报文,对请求资源的检查,回响应报文)当然这个函数放在http模块以更加模块化。从线程的角度就是拿到任务然后调用这个函数,线程就是在漫长的这个函数中度过了。

3、然后是线程池数组,这个线程池还是比较简单的线程池(没有对线程的动态删减等等),在线程池构造的时候就创建好约定个数的线程储存在线程数组里,并且把线程detach掉,这样我们就不需要对线程进行回收等等操作。线程的关键是线程的工作函数run(),这个函数不断while循环直到被条件变量唤醒然后上锁从list头取出任务,开始执行任务。

代码详解

thread_pool()

void WebServer::thread_pool()
{
    //线程池
    m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);
}

1、构造函数完成线程池的初始化,产生了8个线程并且作出了分离
2、m_pool这个指针可以调用append函数来完成将事件入队的任务
3、http_conn给到了一个入口的作用,通过等待信号量来进入到http类里写出的process函数完成对于事务的处理。


意味着以上三步走之后,半同步/半反应堆的模式达成。左侧有嗷嗷待哺的线程,右侧有反应堆

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"

template <typename T>
class threadpool
{
public:
    /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
    threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
    ~threadpool();
    bool append(T *request, int state);
    bool append_p(T *request);

private:
    /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
    static void *worker(void *arg);
    void run();

private:
    int m_thread_number;        //线程池中的线程数
    int m_max_requests;         //请求队列中允许的最大请求数
    pthread_t *m_threads;       //描述线程池的数组,其大小为m_thread_number
    std::list<T *> m_workqueue; //请求队列
    locker m_queuelocker;       //保护请求队列的互斥锁
    sem m_queuestat;            //是否有任务需要处理
    connection_pool *m_connPool;  //数据库
    int m_actor_model;          //模型切换
};
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{
    if (thread_number <= 0 || max_requests <= 0)
        throw std::exception();
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i < thread_number; ++i)
    {
        if (pthread_create(m_threads + i, NULL, worker, this) != 0)
        {
            delete[] m_threads;
            throw std::exception();
        }
        if (pthread_detach(m_threads[i]))
        {
            delete[] m_threads;
            throw std::exception();
        }
    }
}
template <typename T>
threadpool<T>::~threadpool()
{
    delete[] m_threads;
}
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
    m_queuelocker.lock();
    if (m_workqueue.size() >= m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    request->m_state = state;
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}
template <typename T>
bool threadpool<T>::append_p(T *request)
{
    m_queuelocker.lock();
    if (m_workqueue.size() >= m_max_requests)
    {
        m_queuelocker.unlock();
        return false;
    }
    m_workqueue.push_back(request);
    m_queuelocker.unlock();
    m_queuestat.post();
    return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}
template <typename T>
void threadpool<T>::run()
{
    while (true)
    {
        m_queuestat.wait();
        m_queuelocker.lock();
        if (m_workqueue.empty())
        {
            m_queuelocker.unlock();
            continue;
        }
        T *request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();
        if (!request)
            continue;
        if (1 == m_actor_model)
        {
            if (0 == request->m_state)
            {
                if (request->read_once())
                {
                    request->improv = 1;
                    connectionRAII mysqlcon(&request->mysql, m_connPool);
                    request->process();
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
            else
            {
                if (request->write())
                {
                    request->improv = 1;
                }
                else
                {
                    request->improv = 1;
                    request->timer_flag = 1;
                }
            }
        }
        else
        {
            connectionRAII mysqlcon(&request->mysql, m_connPool);
            request->process();
        }
    }
}
#endif

Web服务器---TinyWebServer代码详细讲解(threadpool模块)_github webserver thread pool_才文嘉的博客-CSDN博客

 类似资料: