很多时候,我们的可调用对象需要某些非参数依赖的执行环境。可以这么理解,我们的任务是就绪的,不需要依赖外界传入参数了,只需要以某种方式控制方法的when
、where
和how
即可。
三个分别是指:
where
:可调用对象的执行位置,比如可以在内部或者外部的处理器(线程)上执行,并从内部或者外部的处理器(线程)上获取结果when
:可调用对象的执行时间,比如立刻执行或者被调度执行how
:可调用对象的执行方式,比如在CPU
或者GPU
上执行,可以矢量化计算等举个例子:
int foo(int i) {
return i * 2;
}
// ...
auto f = std::async(std::launch::async, foo(10));
f.get();
// ...
上述代码中,对于foo(10)
来说,关键要素是:
when
:调用std::async
的时候where
:在std::async
内部启动的新线程中how
:在CPU上执行,正常顺序执行很多时候,用户想要获取一个通用的接口类,这个类提供了统一的接口,而且规定了上述的3w
,之后只需要把可执行对象传入接口类中,就能根据该接口类的3w
,最终获取结果。
那么,我们可以把上面说的接口类,认为是一个Executor
。甚至实际的Executor
甚至仅仅是个接口,仅提供了统一的调用方法,让用户自己去实现有关实际功能等。
因此,可以这么理解,Executor
给可调用的对象提供了一个运行环境或者说是运行上下文环境 Execute Context。个人理解,运行上下文是区别于平时说的上下文 Context。因为Context是针对编程环境来说的,或者认为是Context决定了可调用对象的运行形式,能改变可调用对象的表现;而Execute Context无法改变上面的,其只能改变我们说的3w
,而3w
不决定可调用对象的结果。
比如Executor可以是不同类型的线程池,比如一个线程一个队列或者多线程共享队列等。只要指定了Executor,用户就可以传入可调用对象,并等待结果;它屏蔽了底层的执行细节。
C++目前还不支持Executor
的模式,或者说支持的比较弱。。比如std::async
中,无法指定Executor
,只能以std::launch::async
启动新的线程或者std::launch::defered
延后执行等。下面以代码示例,简单演示下支持Executor
的async_exec
函数。
以一个多线程共享队列的线程池:
#include <thread>
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <random>
#include <atomic>
// Executor,仅仅提供接口
class Executor {
public:
Executor() = default;
virtual ~Executor() = default;
virtual void add(std::function<void()> fn) = 0;
};
// 实际的线程池,Executor的具体表现,可以忽略这部分的具体实现
class ThreadPool : public Executor {
public:
explicit ThreadPool(size_t threadCnt = 4) : m_threadCnt(4), m_stop(false) {
std::thread([this]() { run(); }).detach(); // 后台启动
}
~ThreadPool() override {
m_stop = true;
m_cond.notify_all();
for (auto &t: m_threads) {
if (t.joinable()) {
t.join();
}
}
}
ThreadPool(ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
void add(std::function<void()> fn) override {
m_mtx.lock();
m_tasks.emplace_back(std::move(fn));
m_mtx.unlock();
m_cond.notify_one();
}
private:
void run() {
for (size_t i = 0; i < m_threadCnt; ++i) {
m_threads.emplace_back(std::thread([this]() {
for (;;) {
std::unique_lock<std::mutex> lck(m_mtx);
m_cond.wait(lck, [this]() -> bool { return m_stop || !m_tasks.empty(); });
if (m_stop) {
return;
}
auto &fn = m_tasks.front();
m_tasks.pop_front();
lck.unlock(); // 一定要释放掉锁,否则无法并发
fn();
}
}));
}
}
private:
std::vector<std::thread> m_threads;
std::deque<std::function<void()>> m_tasks;
std::condition_variable m_cond;
std::mutex m_mtx;
std::atomic<bool> m_stop{false};
size_t m_threadCnt;
};
// 这里泛型提供了使用Exec的例子
template<typename Fn, typename Exec = Executor>
void async_exec(Fn fn, Exec &exec) {
exec.add(fn);
}
// 工具函数,获取随机数
inline int get_random(int low, int high) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(low, high);
return distrib(gen);
}
void foo(int n) {
int t = get_random(100, 5000);
std::this_thread::sleep_for(std::chrono::milliseconds(t)); // 模拟执行任务
std::cout << "foo finish task " << n << " with " << t << " ms\n";
}
int main() {
ThreadPool tp(4);
for (int i = 0; i < 10; ++i) {
async_exec([i]() { foo(i); }, tp); // 指定实际的Executor执行
}
std::this_thread::sleep_for(std::chrono::seconds(15)); // 休眠15s等所有任务完成
std::cout << "stop everything\n";
return 0;
}