dealwithread和dealwithwrite两个函数作用是插入任务到任务队列等待线程去完成
线程池中两个最为关键的成员:
pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; //请求队列
1、在WebServer中调用append_p和append函数(区别是lfd和cfd)把新任务加入到list的尾部,然后所有线程争夺list中的任务(这里使用条件变量)。
2、争夺到的线程先对任务队列加互斥锁然后从list头取出任务结构体,在这个任务结构体内有一个回调函数,这个函数就是真正的工作(包括解析http请求报文,对请求资源的检查,回响应报文)当然这个函数放在http模块以更加模块化。从线程的角度就是拿到任务然后调用这个函数,线程就是在漫长的这个函数中度过了。
3、然后是线程池数组,这个线程池还是比较简单的线程池(没有对线程的动态删减等等),在线程池构造的时候就创建好约定个数的线程储存在线程数组里,并且把线程detach掉,这样我们就不需要对线程进行回收等等操作。线程的关键是线程的工作函数run(),这个函数不断while循环直到被条件变量唤醒然后上锁从list头取出任务,开始执行任务。
void WebServer::thread_pool()
{
//线程池
m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);
}
1、构造函数完成线程池的初始化,产生了8个线程并且作出了分离
2、m_pool这个指针可以调用append函数来完成将事件入队的任务
3、http_conn给到了一个入口的作用,通过等待信号量来进入到http类里写出的process函数完成对于事务的处理。
意味着以上三步走之后,半同步/半反应堆的模式达成。左侧有嗷嗷待哺的线程,右侧有反应堆
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h"
#include "../CGImysql/sql_connection_pool.h"
template <typename T>
class threadpool
{
public:
/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
bool append(T *request, int state);
bool append_p(T *request);
private:
/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
static void *worker(void *arg);
void run();
private:
int m_thread_number; //线程池中的线程数
int m_max_requests; //请求队列中允许的最大请求数
pthread_t *m_threads; //描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; //请求队列
locker m_queuelocker; //保护请求队列的互斥锁
sem m_queuestat; //是否有任务需要处理
connection_pool *m_connPool; //数据库
int m_actor_model; //模型切换
};
template <typename T>
threadpool<T>::threadpool( int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model),m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool)
{
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
}
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
request->m_state = state;
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
bool threadpool<T>::append_p(T *request)
{
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadpool<T>::run()
{
while (true)
{
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
if (1 == m_actor_model)
{
if (0 == request->m_state)
{
if (request->read_once())
{
request->improv = 1;
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
else
{
request->improv = 1;
request->timer_flag = 1;
}
}
else
{
if (request->write())
{
request->improv = 1;
}
else
{
request->improv = 1;
request->timer_flag = 1;
}
}
}
else
{
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif
Web服务器---TinyWebServer代码详细讲解(threadpool模块)_github webserver thread pool_才文嘉的博客-CSDN博客