当前位置: 首页 > 工具软件 > threadpool > 使用案例 >

基于C++11的线程池实现(ThreadPool)

夹谷俊远
2023-12-01

思路:

线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是阻碍性能进步的关键,因此需要在线程池内创建多个线程,无限循环从任务队列中取走任务并执行,提高性能。

线程池的结构:

class ThreadPool
{
public:
    explicit ThreadPool(size_t threadCount = 8);

    ~ThreadPool();

    ThreadPool(const ThreadPool& other) = delete;

    ThreadPool& operator = (const ThreadPool& other) = delete;

    template<class T>
    void addTask(T&& task);//添加任务

private:
    void runInThread();

    std::vector<std::thread> workers;//线程池

    std::queue<std::function<void()>> tasks;//任务队列

    std::mutex mtx;

    std::condition_variable cond;

    bool isClosed;
};

构造函数:inline ThreadPool::ThreadPool(size_t threadCount)

inline ThreadPool::ThreadPool(size_t threadCount)
{
    assert(threadCount > 0);
    isClosed = false;
    for (size_t i = 0; i < threadCount; i++)//添加固定数量的线程
    {
        workers.emplace_back(
            std::thread(std::bind(&ThreadPool::runInThread, this))
        );
    }
}

线程启动函数:inline void ThreadPool::runInThread()

inline void ThreadPool::runInThread()
{
    while (!isClosed)//循环取出任务并执行
    {
        std::unique_lock<std::mutex> lock(mtx);
        while (tasks.empty() && !isClosed)
        {
            cond.wait(lock);
        }
        if (!tasks.empty())
        {
            auto task = std::move(tasks.front());
            tasks.pop();
            lock.unlock();//执行任务前解锁
            task();
        }
    }
}

析构函数:inline ThreadPool::~ThreadPool()

inline ThreadPool::~ThreadPool()
{
    {
        std::lock_guard<std::mutex> lock(mtx);
        isClosed = true;
        cond.notify_all();
    }
    for (auto& thr : workers)//等待所有线程完成手中的任务
    {
        thr.join();
    }
}

添加任务函数:inline void ThreadPool::addTask(T&& task)

template<class T>
inline void ThreadPool::addTask(T&& task)
{
    if (workers.size() == 0)
    {
        task();
    }
    if (!isClosed)
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.emplace(std::forward<T>(task));
        cond.notify_one();
    }
}

 测试程序:向任务队列中放置100个任务,然后由固定的4个线程进行循环操作,打印出执行任务的线程id及任务执行结果。

#include "threadpool.h"
#include<iostream>
#include<random>
#include<algorithm>
#include<windows.h>
using namespace std;

void taskFunc(void* arg)
{
    int num = *static_cast<int*>(arg);
    thread::id this_thread_id = this_thread::get_id();
    cout<<"threadId="<<this_thread_id<<"	num="<<num<<endl;
    Sleep(10);
}

int main()
{
    ThreadPool pool(4);
    for(int i = 0;i < 100;++i)
	{
        int* num = new int(i);
        pool.addTask(function<void()>(bind(&taskFunc,num)));
    }
    Sleep(1000);
    return 0;
}

 测试结果:

threadId=3      num=0
threadId=4      num=1
threadId=5      num=2
threadId=2      num=3
threadId=5      num=4
threadId=4      num=5
threadId=2      num=6
threadId=3      num=7
threadId=5      num=8
threadId=2      num=10
threadId=4      num=9
threadId=3      num=11
threadId=3      num=12
threadId=4      num=14
threadId=5      num=15
threadId=2      num=13
threadId=5      num=16
threadId=4      num=18
threadId=2      num=19
threadId=3      num=17
threadId=3      num=20
threadId=4      num=22
threadId=2      num=21
threadId=5      num=23
threadId=4      num=24
threadId=2      num=26
threadId=3      num=25
threadId=5      num=27
threadId=3      num=28
threadId=5      num=30
threadId=4      num=31
threadId=2      num=29
threadId=2      num=32
threadId=4      num=34
threadId=3      num=35
threadId=5      num=33
threadId=4      num=36
threadId=5      num=38
threadId=2      num=39
threadId=3      num=37
threadId=5      num=40
threadId=4      num=42
threadId=3      num=43
threadId=2      num=41
threadId=3      num=44
threadId=4      num=45
threadId=5      num=46
threadId=2      num=47
threadId=2      num=48
threadId=4      num=49
threadId=5      num=50
threadId=3      num=51
threadId=4      num=52
threadId=5      num=54
threadId=2      num=53
threadId=3      num=55
threadId=5      num=56
threadId=4      num=59
threadId=3      num=58
threadId=2      num=57
threadId=4      num=60
threadId=3      num=61
threadId=5      num=62
threadId=2      num=63
threadId=2      num=64
threadId=5      num=66
threadId=4      num=67
threadId=3      num=65
threadId=4      num=68
threadId=2      num=70
threadId=3      num=71
threadId=5      num=69
threadId=5      num=72
threadId=4      num=74
threadId=2      num=75
threadId=3      num=73
threadId=4      num=76
threadId=3      num=78
threadId=2      num=79
threadId=5      num=77
threadId=3      num=80
threadId=5      num=82
threadId=4      num=83
threadId=2      num=81
threadId=3      num=84
threadId=5      num=86
threadId=4      num=87
threadId=2      num=85
threadId=4      num=88
threadId=5      num=90
threadId=3      num=91
threadId=2      num=89
threadId=4      num=92
threadId=2      num=94
threadId=5      num=95
threadId=3      num=93
threadId=5      num=96
threadId=2      num=98
threadId=3      num=99
threadId=4      num=97

由测试结果可以看到,四个线程id=2、3、4、5,各自循环从任务队列中拿取任务并执行,共执行任务100个,不重不漏!

缺点:

1、任务队列并未设置最大容量,大量的任务加入其中有可能会导致内存耗尽。

2、任务只能由function+bind的方式添加,不太灵活。

3、没有取得线程的返回值。

源码:

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <thread>
#include <functional>
#include <assert.h>

class ThreadPool
{
public:
    explicit ThreadPool(size_t threadCount = 8);

    ~ThreadPool();

    ThreadPool(const ThreadPool& other) = delete;

    ThreadPool& operator = (const ThreadPool& other) = delete;

    template<class T>
    void addTask(T&& task);

private:
    void runInThread();

    std::vector<std::thread> workers;

    std::queue<std::function<void()>> tasks;

    std::mutex mtx;

    std::condition_variable cond;

    bool isClosed;
};

inline ThreadPool::ThreadPool(size_t threadCount)
{
    assert(threadCount > 0);
    isClosed = false;
    for (size_t i = 0; i < threadCount; i++)
    {
        workers.emplace_back(
            std::thread(std::bind(&ThreadPool::runInThread, this))
        );
    }
}

inline ThreadPool::~ThreadPool()
{
    {
        std::lock_guard<std::mutex> lock(mtx);
        isClosed = true;
        cond.notify_all();
    }
    for (auto& thr : workers)
    {
        thr.join();
    }
}

inline void ThreadPool::runInThread()
{
    while (!isClosed)
    {
        std::unique_lock<std::mutex> lock(mtx);
        while (tasks.empty() && !isClosed)
        {
            cond.wait(lock);
        }
        if (!tasks.empty())
        {
            auto task = std::move(tasks.front());
            tasks.pop();
            lock.unlock();
            task();
        }
    }
}

template<class T>
inline void ThreadPool::addTask(T&& task)
{
    if (workers.size() == 0)
    {
        task();
    }
    if (!isClosed)
    {
        std::lock_guard<std::mutex> lock(mtx);
        tasks.emplace(std::forward<T>(task));
        cond.notify_one();
    }
}

#endif //THREADPOOL_H
 类似资料: