每个工作线程拥有一个条件变量,一把锁,但是通过类CThreadNotify管理使用方便很多
线程池管理线程和所有工作线程共用一把锁,一个条件变量
// Task.h
class CTask {
public:
CTask(){}
virtual ~CTask(){}
virtual void run() = 0;
private:
};
// Thread.h
#include <pthread.h>
class CThreadNotify
{
public:
CThreadNotify();
~CThreadNotify();
void Lock() { pthread_mutex_lock(&m_mutex); }
void Unlock() { pthread_mutex_unlock(&m_mutex); }
void Wait() { pthread_cond_wait(&m_cond, &m_mutex); }
void Signal() { pthread_cond_signal(&m_cond); }
private:
pthread_mutex_t m_mutex;
pthread_mutexattr_t m_mutexattr;
pthread_cond_t m_cond;
};
// Thread.cpp
CThreadNotify::CThreadNotify()
{
pthread_mutexattr_init(&m_mutexattr);
pthread_mutexattr_settype(&m_mutexattr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&m_mutex, &m_mutexattr);
pthread_cond_init(&m_cond, NULL);
}
CThreadNotify::~CThreadNotify()
{
pthread_mutexattr_destroy(&m_mutexattr);
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
// ThreadPool.h
#include "Thread.h"
#include "Task.h"
#include <stdint.h>
#include <list>
using namespace std;
class CWorkerThread {
public:
CWorkerThread();
~CWorkerThread();
/**********************
为什么要是静态方法???非静态函数出现如下编译问题:
ThreadPool.cpp:24:43: error: invalid use of non-static member function ‘void* CWorkerThread::Execute(void*)’
24 | (void)pthread_create(&m_thread_id, NULL, Execute, NULL);
| ^~~~~~~~~~~~
ThreadPool.cpp:13:7: note: declared here
13 | void* CWorkerThread::Execute(void* arg)
出现类型不匹配的问题。因为pthread_create需要的参数类型为void* (*)(void*),而Execute作为类的成员函数时其类型是
void* (CWorkerThread::)(void*)的成员函数指针。我们知道类的成员函数在经过编译器处理之后,会变成带有 this指针参
数的全局函数,所以类型注定是不会匹配的。但是如果将Execute声明为static类型,那么编译器会将static形 式的函数,转
换成不带this指针的全局函数,所以其类型可以与pthread_create需要的参数类型相匹配。但是类的静态成员函数无法访问类
的非静态成员,不过这可以通过传递this指针解决这个问题。
因此就出现了StartRoutine这个静态方法
*********************/
static void* StartRoutine(void* arg);
void Start();
void Execute();
void PushTask(CTask* pTask);
void SetThreadIdx(uint32_t idx) { m_thread_idx = idx; }
private:
uint32_t m_thread_idx;
uint32_t m_task_cnt;
pthread_t m_thread_id;
CThreadNotify m_thread_notify;
list<CTask*> m_task_list;
bool m_shutdown; // 是否销毁线程
};
// ThreadPool.cpp
CWorkerThread::CWorkerThread()
{
m_task_cnt = 0;
m_shutdown = false;
}
CWorkerThread::~CWorkerThread()
{
m_shutdown = true;
m_thread_notify.Signal(); // 唤醒工作线程
pthread_join(m_thread_id, NULL);
// 将任务丢弃并清空,防止内存泄漏
for (list<CTask*>::iterator it = m_task_list.begin(); it != m_task_list.end(); it++)
{
delete *it;
}
}
void* CWorkerThread::StartRoutine(void* arg)
{
CWorkerThread* pThread = (CWorkerThread*)arg;
pThread->Execute();
return NULL;
}
void CWorkerThread::Start()
{
(void)pthread_create(&m_thread_id, NULL, StartRoutine, this);
}
void CWorkerThread::Execute()
{
while (true) {
m_thread_notify.Lock();
// put wait in while cause there can be spurious wake up (due to signal/ENITR)
while (m_task_list.empty() && !m_shutdown) {
m_thread_notify.Wait();
}
// 销毁线程
if (m_shutdown) {
break;
}
CTask* pTask = m_task_list.front();
m_task_list.pop_front();
m_thread_notify.Unlock();
pTask->run();
delete pTask;
m_task_cnt++;
//log("%d have the execute %d task\n", m_thread_idx, m_task_cnt);
}
}
void CWorkerThread::PushTask(CTask* pTask)
{
m_thread_notify.Lock();
m_task_list.push_back(pTask);
m_thread_notify.Signal();
m_thread_notify.Unlock();
}
其中销毁线程这部分代码自己添加的,原版本在销毁线程池的时候无法退出。
// ThreadPool.h
class CThreadPool {
public:
CThreadPool();
virtual ~CThreadPool();
int Init(uint32_t worker_size);
void AddTask(CTask* pTask);
void Destory();
private:
uint32_t m_worker_size;
CWorkerThread* m_worker_list;
};
// ThreadPool.cpp
CThreadPool::CThreadPool()
{
m_worker_size = 0;
m_worker_list = NULL;
}
CThreadPool::~CThreadPool()
{
Destory();
}
int CThreadPool::Init(uint32_t worker_size)
{
m_worker_size = worker_size;
m_worker_list = new CWorkerThread [m_worker_size];
if (!m_worker_list) {
return 1;
}
for (uint32_t i = 0; i < m_worker_size; i++) {
m_worker_list[i].SetThreadIdx(i);
m_worker_list[i].Start();
}
return 0;
}
void CThreadPool::Destory()
{
if(m_worker_list) {
delete [] m_worker_list;
m_worker_size = 0;
m_worker_list = NULL;
}
}
void CThreadPool::AddTask(CTask* pTask)
{
/*
* select a random thread to push task
* we can also select a thread that has less task to do
* but that will scan the whole thread list and use thread lock to get each task size
*/
uint32_t thread_idx = random() % m_worker_size;
m_worker_list[thread_idx].PushTask(pTask);
}
#include <iostream>
#include <string>
using namespace std;
#include <unistd.h>
#include <stdlib.h> // random()
#include "ThreadPool.h"
class Echo : public CTask
{
public:
Echo(int s) {m_tag = s;}
~Echo() {}
void run();
private:
int m_tag;
};
void Echo::run()
{
usleep(random()%10);
cout << "I'm task " << m_tag << endl;
}
int main()
{
CThreadPool *tpool = new CThreadPool();
tpool->Init(3);
for (int i = 0; i < 10; i++) {
CTask *pTask = new Echo(i);
tpool->AddTask(pTask);
}
delete tpool;
return 0;
}
1.2两步为一个原子操作
通过while循环检测防止虚假唤醒
teamtalk的线程池模型比较简单,比如:
ps: 以上纯属个人见解,欢迎拍砖。