当前位置: 首页 > 工具软件 > TeamTalk > 使用案例 >

Teamtalk线程池

夏法
2023-12-01

一、线程池模型

  1. 生产者自主选择消费者(teamtalk思路):任务队列存在于每个工作线程,当有新任务到来时,线程池管理线程自主随机向其中一个工作线程派发任务,推送到该工作线程的任务队列中,接着唤醒该工作线程,从任务队列中取出任务执行。
    每个工作线程拥有一个条件变量,一把锁,但是通过类CThreadNotify管理使用方便很多
  2. 消费者竞争:线程池管理线程维护任务队列,当有任务到来时,任务存于该队列中。然后将新任务到达这个消息广播出去,让工作线程来竞争。
    线程池管理线程和所有工作线程共用一把锁,一个条件变量

二、teamtalk线程池代码

  1. 任务类CTask,想处理的工作任务必须继承的基类,然后使用多态
// Task.h
class CTask {
public:
    CTask(){}
    virtual ~CTask(){}
    
    virtual void run() = 0;
private:
};
  1. 锁与条件变量类
// Thread.h
#include <pthread.h>
class CThreadNotify
{
public:
	CThreadNotify();
	~CThreadNotify();
	void Lock() { pthread_mutex_lock(&m_mutex); }
	void Unlock() { pthread_mutex_unlock(&m_mutex); }
	void Wait() { pthread_cond_wait(&m_cond, &m_mutex); }
	void Signal() { pthread_cond_signal(&m_cond); }
private:
	pthread_mutex_t 	m_mutex;
	pthread_mutexattr_t	m_mutexattr;
    
	pthread_cond_t 		m_cond;
};

// Thread.cpp
CThreadNotify::CThreadNotify()
{
	pthread_mutexattr_init(&m_mutexattr);
	pthread_mutexattr_settype(&m_mutexattr, PTHREAD_MUTEX_RECURSIVE);
	pthread_mutex_init(&m_mutex, &m_mutexattr);
    
	pthread_cond_init(&m_cond, NULL);
}

CThreadNotify::~CThreadNotify()
{
	pthread_mutexattr_destroy(&m_mutexattr);
	pthread_mutex_destroy(&m_mutex);
    
	pthread_cond_destroy(&m_cond);
}
  1. 工作线程类
// ThreadPool.h
#include "Thread.h"
#include "Task.h"
#include <stdint.h>
#include <list>

using namespace std;

class CWorkerThread {
public:
	CWorkerThread();
	~CWorkerThread();

	/**********************
		为什么要是静态方法???非静态函数出现如下编译问题:
		ThreadPool.cpp:24:43: error: invalid use of non-static member function ‘void* CWorkerThread::Execute(void*)’
		24 |  (void)pthread_create(&m_thread_id, NULL, Execute, NULL);
			|                                           ^~~~~~~~~~~~
		ThreadPool.cpp:13:7: note: declared here
		13 | void* CWorkerThread::Execute(void* arg)

		出现类型不匹配的问题。因为pthread_create需要的参数类型为void* (*)(void*),而Execute作为类的成员函数时其类型是
		void* (CWorkerThread::)(void*)的成员函数指针。我们知道类的成员函数在经过编译器处理之后,会变成带有 this指针参
		数的全局函数,所以类型注定是不会匹配的。但是如果将Execute声明为static类型,那么编译器会将static形 式的函数,转
		换成不带this指针的全局函数,所以其类型可以与pthread_create需要的参数类型相匹配。但是类的静态成员函数无法访问类
		的非静态成员,不过这可以通过传递this指针解决这个问题。

		因此就出现了StartRoutine这个静态方法
	*********************/
	static void* StartRoutine(void* arg);

	void Start();
	void Execute();
	void PushTask(CTask* pTask);

	void SetThreadIdx(uint32_t idx) { m_thread_idx = idx; }
private:

	uint32_t		m_thread_idx;
	uint32_t		m_task_cnt;
	pthread_t		m_thread_id;
	CThreadNotify	m_thread_notify;
	list<CTask*>	m_task_list;
    bool            m_shutdown;	// 是否销毁线程
};

// ThreadPool.cpp
CWorkerThread::CWorkerThread()
{
	m_task_cnt = 0;
    m_shutdown = false;
}

CWorkerThread::~CWorkerThread()
{
    m_shutdown = true;
    m_thread_notify.Signal();   // 唤醒工作线程
    pthread_join(m_thread_id, NULL);

    // 将任务丢弃并清空,防止内存泄漏
    for (list<CTask*>::iterator it = m_task_list.begin(); it != m_task_list.end(); it++)
    {
        delete *it;
    }
}

void* CWorkerThread::StartRoutine(void* arg)
{
	CWorkerThread* pThread = (CWorkerThread*)arg;

	pThread->Execute();

	return NULL;
}

void CWorkerThread::Start()
{
	(void)pthread_create(&m_thread_id, NULL, StartRoutine, this);
}

void CWorkerThread::Execute()
{
	while (true) {
		m_thread_notify.Lock();

		// put wait in while cause there can be spurious wake up (due to signal/ENITR)
		while (m_task_list.empty() && !m_shutdown) {
			m_thread_notify.Wait();
		}
        // 销毁线程
        if (m_shutdown) {
            break;
        }
		CTask* pTask = m_task_list.front();
		m_task_list.pop_front();
		m_thread_notify.Unlock();

		pTask->run();

		delete pTask;

		m_task_cnt++;
		//log("%d have the execute %d task\n", m_thread_idx, m_task_cnt);
	}
}

void CWorkerThread::PushTask(CTask* pTask)
{
	m_thread_notify.Lock();
	m_task_list.push_back(pTask);
	m_thread_notify.Signal();
	m_thread_notify.Unlock();
}

其中销毁线程这部分代码自己添加的,原版本在销毁线程池的时候无法退出。

  1. 线程池类
// ThreadPool.h
class CThreadPool {
public:
	CThreadPool();
	virtual ~CThreadPool();

	int Init(uint32_t worker_size);
	void AddTask(CTask* pTask);
	void Destory();
private:
	uint32_t 		m_worker_size;
	CWorkerThread* 	m_worker_list;
};

// ThreadPool.cpp
CThreadPool::CThreadPool()
{
	m_worker_size = 0;
	m_worker_list = NULL;
}

CThreadPool::~CThreadPool()
{
    Destory();
}

int CThreadPool::Init(uint32_t worker_size)
{
    m_worker_size = worker_size;
	m_worker_list = new CWorkerThread [m_worker_size];
	if (!m_worker_list) {
		return 1;
	}

	for (uint32_t i = 0; i < m_worker_size; i++) {
		m_worker_list[i].SetThreadIdx(i);
		m_worker_list[i].Start();
	}

	return 0;
}

void CThreadPool::Destory()
{
    if(m_worker_list) {
        delete [] m_worker_list;
        m_worker_size = 0;
        m_worker_list = NULL;
    }
}

void CThreadPool::AddTask(CTask* pTask)
{
	/*
	 * select a random thread to push task
	 * we can also select a thread that has less task to do
	 * but that will scan the whole thread list and use thread lock to get each task size
	 */
	uint32_t thread_idx = random() % m_worker_size;
	m_worker_list[thread_idx].PushTask(pTask);
}
  1. 使用举例
#include <iostream>
#include <string>
using namespace std;

#include <unistd.h>
#include <stdlib.h>     // random()

#include "ThreadPool.h"

class Echo : public CTask
{
public:
    Echo(int s) {m_tag = s;}
    ~Echo() {}
    void run();
private:
    int m_tag;
};

void Echo::run()
{
    usleep(random()%10);
    cout << "I'm task " << m_tag << endl;
}


int main()
{
    CThreadPool *tpool = new CThreadPool();
    tpool->Init(3);

    for (int i = 0; i < 10; i++) {
        CTask *pTask = new Echo(i);
        tpool->AddTask(pTask);
    }

    delete tpool;

    return 0;
}

三、pthread_cond_wait()函数

  1. 阻塞等待一个条件变量
  2. 释放已掌握的互斥锁(相当于调用了pthread_mutex_unlock())
    1.2两步为一个原子操作
  3. 当被唤醒,pthread_cond_wait函数返回时,解除阻塞并重新申请获取互斥锁(相当于调用了pthread_mutex_lock())
    通过while循环检测防止虚假唤醒

四、总结

teamtalk的线程池模型比较简单,比如:

  1. 任务派发方式是随机的,更合理的情况应该是向所有中线程剩余任务数最少的线程派发,不过作者也说明了,找剩余任务数最少的线程,需要遍历所有线程,这又涉及到加/解锁,会很麻烦。只要够随机,效率可能还会更高(ps:为什么不用轮询呢,感觉也差不多吧);
  2. 线程数不能动态扩缩;
  3. 未设计销毁线程池(应该是考虑到不太可能会有这个操作就忽略了)。

ps: 以上纯属个人见解,欢迎拍砖。

源码地址: https://gitee.com/liu_school/threadpool

 类似资料: