线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是阻碍性能进步的关键,因此需要在线程池内创建多个线程,无限循环从任务队列中取走任务并执行,提高性能。
线程池的结构:
class ThreadPool
{
public:
explicit ThreadPool(size_t threadCount = 8);
~ThreadPool();
ThreadPool(const ThreadPool& other) = delete;
ThreadPool& operator = (const ThreadPool& other) = delete;
template<class T>
void addTask(T&& task);//添加任务
private:
void runInThread();
std::vector<std::thread> workers;//线程池
std::queue<std::function<void()>> tasks;//任务队列
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
};
构造函数:inline ThreadPool::ThreadPool(size_t threadCount)
inline ThreadPool::ThreadPool(size_t threadCount)
{
assert(threadCount > 0);
isClosed = false;
for (size_t i = 0; i < threadCount; i++)//添加固定数量的线程
{
workers.emplace_back(
std::thread(std::bind(&ThreadPool::runInThread, this))
);
}
}
线程启动函数:inline void ThreadPool::runInThread()
inline void ThreadPool::runInThread()
{
while (!isClosed)//循环取出任务并执行
{
std::unique_lock<std::mutex> lock(mtx);
while (tasks.empty() && !isClosed)
{
cond.wait(lock);
}
if (!tasks.empty())
{
auto task = std::move(tasks.front());
tasks.pop();
lock.unlock();//执行任务前解锁
task();
}
}
}
析构函数:inline ThreadPool::~ThreadPool()
inline ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(mtx);
isClosed = true;
cond.notify_all();
}
for (auto& thr : workers)//等待所有线程完成手中的任务
{
thr.join();
}
}
添加任务函数:inline void ThreadPool::addTask(T&& task)
template<class T>
inline void ThreadPool::addTask(T&& task)
{
if (workers.size() == 0)
{
task();
}
if (!isClosed)
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace(std::forward<T>(task));
cond.notify_one();
}
}
测试程序:向任务队列中放置100个任务,然后由固定的4个线程进行循环操作,打印出执行任务的线程id及任务执行结果。
#include "threadpool.h"
#include<iostream>
#include<random>
#include<algorithm>
#include<windows.h>
using namespace std;
void taskFunc(void* arg)
{
int num = *static_cast<int*>(arg);
thread::id this_thread_id = this_thread::get_id();
cout<<"threadId="<<this_thread_id<<" num="<<num<<endl;
Sleep(10);
}
int main()
{
ThreadPool pool(4);
for(int i = 0;i < 100;++i)
{
int* num = new int(i);
pool.addTask(function<void()>(bind(&taskFunc,num)));
}
Sleep(1000);
return 0;
}
测试结果:
threadId=3 num=0
threadId=4 num=1
threadId=5 num=2
threadId=2 num=3
threadId=5 num=4
threadId=4 num=5
threadId=2 num=6
threadId=3 num=7
threadId=5 num=8
threadId=2 num=10
threadId=4 num=9
threadId=3 num=11
threadId=3 num=12
threadId=4 num=14
threadId=5 num=15
threadId=2 num=13
threadId=5 num=16
threadId=4 num=18
threadId=2 num=19
threadId=3 num=17
threadId=3 num=20
threadId=4 num=22
threadId=2 num=21
threadId=5 num=23
threadId=4 num=24
threadId=2 num=26
threadId=3 num=25
threadId=5 num=27
threadId=3 num=28
threadId=5 num=30
threadId=4 num=31
threadId=2 num=29
threadId=2 num=32
threadId=4 num=34
threadId=3 num=35
threadId=5 num=33
threadId=4 num=36
threadId=5 num=38
threadId=2 num=39
threadId=3 num=37
threadId=5 num=40
threadId=4 num=42
threadId=3 num=43
threadId=2 num=41
threadId=3 num=44
threadId=4 num=45
threadId=5 num=46
threadId=2 num=47
threadId=2 num=48
threadId=4 num=49
threadId=5 num=50
threadId=3 num=51
threadId=4 num=52
threadId=5 num=54
threadId=2 num=53
threadId=3 num=55
threadId=5 num=56
threadId=4 num=59
threadId=3 num=58
threadId=2 num=57
threadId=4 num=60
threadId=3 num=61
threadId=5 num=62
threadId=2 num=63
threadId=2 num=64
threadId=5 num=66
threadId=4 num=67
threadId=3 num=65
threadId=4 num=68
threadId=2 num=70
threadId=3 num=71
threadId=5 num=69
threadId=5 num=72
threadId=4 num=74
threadId=2 num=75
threadId=3 num=73
threadId=4 num=76
threadId=3 num=78
threadId=2 num=79
threadId=5 num=77
threadId=3 num=80
threadId=5 num=82
threadId=4 num=83
threadId=2 num=81
threadId=3 num=84
threadId=5 num=86
threadId=4 num=87
threadId=2 num=85
threadId=4 num=88
threadId=5 num=90
threadId=3 num=91
threadId=2 num=89
threadId=4 num=92
threadId=2 num=94
threadId=5 num=95
threadId=3 num=93
threadId=5 num=96
threadId=2 num=98
threadId=3 num=99
threadId=4 num=97
由测试结果可以看到,四个线程id=2、3、4、5,各自循环从任务队列中拿取任务并执行,共执行任务100个,不重不漏!
缺点:
1、任务队列并未设置最大容量,大量的任务加入其中有可能会导致内存耗尽。
2、任务只能由function+bind的方式添加,不太灵活。
3、没有取得线程的返回值。
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <thread>
#include <functional>
#include <assert.h>
class ThreadPool
{
public:
explicit ThreadPool(size_t threadCount = 8);
~ThreadPool();
ThreadPool(const ThreadPool& other) = delete;
ThreadPool& operator = (const ThreadPool& other) = delete;
template<class T>
void addTask(T&& task);
private:
void runInThread();
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
};
inline ThreadPool::ThreadPool(size_t threadCount)
{
assert(threadCount > 0);
isClosed = false;
for (size_t i = 0; i < threadCount; i++)
{
workers.emplace_back(
std::thread(std::bind(&ThreadPool::runInThread, this))
);
}
}
inline ThreadPool::~ThreadPool()
{
{
std::lock_guard<std::mutex> lock(mtx);
isClosed = true;
cond.notify_all();
}
for (auto& thr : workers)
{
thr.join();
}
}
inline void ThreadPool::runInThread()
{
while (!isClosed)
{
std::unique_lock<std::mutex> lock(mtx);
while (tasks.empty() && !isClosed)
{
cond.wait(lock);
}
if (!tasks.empty())
{
auto task = std::move(tasks.front());
tasks.pop();
lock.unlock();
task();
}
}
}
template<class T>
inline void ThreadPool::addTask(T&& task)
{
if (workers.size() == 0)
{
task();
}
if (!isClosed)
{
std::lock_guard<std::mutex> lock(mtx);
tasks.emplace(std::forward<T>(task));
cond.notify_one();
}
}
#endif //THREADPOOL_H