使用现有线程库(pthread.h)实现一个工作线程池,工作线程使用std::list实现任务队列,使用条件变量来解决生产者消费者的竞争问题
// 抽象类,所有任务的基础类,子类重写run方法,来调用任务处理函数。
class CTask {
public:
CTask(){}
virtual ~CTask(){}
virtual void run() = 0;
private:
};
//将pthread.h库里的锁和条件变量封装成方便线程库使用的接口
class CThreadNotify
{
public:
CThreadNotify();
~CThreadNotify();
void Lock() { pthread_mutex_lock(&m_mutex); }
void Unlock() { pthread_mutex_unlock(&m_mutex); }
void Wait() { pthread_cond_wait(&m_cond, &m_mutex); }
void Signal() { pthread_cond_signal(&m_cond); }
private:
pthread_mutex_t m_mutex;
pthread_mutexattr_t m_mutexattr;
pthread_cond_t m_cond;
};
// 工作线程,使用std::list作为任务队列,PushTask向队列里添加任务,内部条件变量会被唤醒(CThreadNotify);
//Start会创建线程消耗队列里的任务,同时没有任务时线程会被阻塞,等待条件变量唤醒(CThreadNotify)
class CWorkerThread {
public:
CWorkerThread();
~CWorkerThread();
static void* StartRoutine(void* arg);
void Start();
void Execute();
void PushTask(CTask* pTask);
void SetThreadIdx(uint32_t idx) { m_thread_idx = idx; }
private:
uint32_t m_thread_idx;
uint32_t m_task_cnt;
pthread_t m_thread_id;
CThreadNotify m_thread_notify;
list<CTask*> m_task_list;
};
//Init(uint32_t worker_size)会创建指定数量的工作线程(new CWorkerThread [m_worker_size])形成线程池,同时Init会逐一启动线程池里工作线程;
//AddTask(CTask* pTask)用于添加任务,并随机分发某一个工作线程里去执行
class CThreadPool {
public:
CThreadPool();
virtual ~CThreadPool();
int Init(uint32_t worker_size);
void AddTask(CTask* pTask);
void Destory();
private:
uint32_t m_worker_size;
CWorkerThread* m_worker_list;
};
3.源码