“源码面前,了无秘密”
所有使用asio的程序都需要至少有一个I/O执行上下文,例如io_context或thread_pool对象。I/O执行上下文提供对I/O功能的访问。
本文使用的asio为non-boost-asio,版本号:asio-1.24.0
pc环境:ubuntu 20
开发工具:vs2022 windows通过ssh远程unubtu调试代码
先将io_context类源码声明贴出来。
class io_context
: public execution_context
{
private:
typedef detail::io_context_impl impl_type;
#if defined(ASIO_HAS_IOCP)
friend class detail::win_iocp_overlapped_ptr;
#endif
public:
class executor_type;
friend class executor_type;
#if !defined(ASIO_NO_DEPRECATED)
class work;
friend class work;
#endif // !defined(ASIO_NO_DEPRECATED)
class service;
#if !defined(ASIO_NO_EXTENSIONS)
class strand;
#endif // !defined(ASIO_NO_EXTENSIONS)
/// 用于计算上下文执行的处理程序数的类型。
typedef std::size_t count_type;
/// 构造
ASIO_DECL io_context();
/// 构造
/**
* 使用并发进行构造。
*
* @param concurrency_hint 并发数量
*/
ASIO_DECL explicit io_context(int concurrency_hint);
/// 析构.
/**
* 销毁时,io_context执行以下序列
* 操作:
*
* @li 对于io_context集中的每个服务对象@c svc,按相反顺序
* 服务对象生命周期开始时,执行
* @c svc->shutdown().
*
* @li 计划用于延迟调用的未调用处理程序对象
* 在io_context或任何相关联的链上,都会被销毁。
*
* @li 对于io_context集中的每个服务对象@c svc,按相反的顺序
* 服务对象生命周期开始时,执行
* <tt>delete static_cast<io_context::service*>(svc)</tt>.
*
* @note 上述销毁顺序允许程序
* 通过使用@cshared_ptr来简化他们的资源管理。如果
* 对象的生存期与连接的生存期(或其他
* 异步操作序列),对象的@c shared_ptr将
* 绑定到与关联的所有异步操作的处理程序中
* 它的工作原理如下:
*
* @li 当单个连接结束时,所有关联的异步操作
* 完整。相应的处理程序对象被销毁,并且
* @c shared_ptr对对象的引用被销毁。
*
* @li 要关闭整个程序,io_context函数stop()是
* 调用以尽快终止任何run()调用。io_context
* 上面定义的析构函数破坏所有处理程序,导致所有@c shared_ptr
* 对所有要销毁的连接对象的引用。
*/
ASIO_DECL ~io_context();
/// 获取与io_context关联的执行器。
executor_type get_executor() ASIO_NOEXCEPT;
/// 运行io_context对象的事件处理循环。
/**
* run()函数会阻塞,直到所有工作都完成,并且没有
* 要调度更多的处理程序,或者直到io_context停止。
*
* 多个线程可以调用run()函数来设置线程池
* io_context可以从中执行处理程序。所有的线程
* 在池中等待是等效的,io_context可以选择任何一个
* 以调用一个处理程序。
*
* run()函数的正常退出意味着io_context对象
* 已停止(stopped()函数返回@ctrue)。对的后续调用
* run()、run_one()、poll()或poll_one()将立即返回,除非存在
* 是对restart()的先前调用。
*
* @return 已执行的处理程序数。
*
* @note 从当前正在调用的线程调用run()函数
* 上的run()、run_one()、run _for()、run_until()、poll()或poll_one()之一
* 相同的io_context对象可能会引入死锁的可能性。它是
* 呼叫方避免这种情况的责任。
*
* poll()函数也可以用于调度准备就绪的处理程序,但是没有阻塞。
*/
ASIO_DECL count_type run();
#if !defined(ASIO_NO_DEPRECATED)
/// (不推荐:使用非error_code重载。)运行io_context对象的事件处理循环。
/**
* run()函数会阻塞,直到所有工作都完成,并且没有
* 要调度更多的处理程序,或者直到io_context停止。
*
* 多个线程可以调用run()函数来设置线程池
* io_context可以从中执行处理程序。所有的线程
* 在池中等待是等效的,io_context可以选择任何一个
* 以调用一个处理程序。
*
* run()函数的正常退出意味着io_context对象
* 已停止(stopped()函数返回@ctrue)。对的后续调用
* run()、run_one()、poll()或poll_one()将立即返回,除非存在
* 是对restart()的先前调用。
*
* @param ec 设置为指示发生了什么错误(如果有)。
*
* @return 已执行的处理程序数。
*
* @note 从当前正在调用的线程调用run()函数
* 上的run()、run_one()、run _for()、run_until()、poll()或poll_one()之一
* 相同的io_context对象可能会引入死锁的可能性。它是
* 呼叫方避免这种情况的责任。
*
* poll()函数也可以用于调度准备就绪的处理程序,但是没有阻塞。
*/
ASIO_DECL count_type run(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)
#if defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)
/// 为指定的对象运行io_context对象的事件处理循环
///持续时间。
/**
* run_for()函数会阻塞,直到所有工作都完成,并且没有
* 要调度的更多处理程序,直到io_context停止,或者
* 直到指定的持续时间过去。
*
* @param rel_time 呼叫可能被阻止的持续时间。
*
* @return 已执行的处理程序数。
*/
template <typename Rep, typename Period>
std::size_t run_for(const chrono::duration<Rep, Period>& rel_time);
///运行io_context对象的事件处理循环,直到指定的时间。
/**
* run_until()函数会阻塞,直到所有工作都完成,并且
* 在io_context停止之前,不再调度任何处理程序,
* 或者直到达到指定的时间为止。
*
* @param abs_time 呼叫可能被阻塞的时间点。
*
* @return 已执行的处理程序数。
*/
template <typename Clock, typename Duration>
std::size_t run_until(const chrono::time_point<Clock, Duration>& abs_time);
#endif // defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)
///运行io_context对象的事件处理循环,最多执行一个
///处理程序。
/**
* run_one()函数会阻塞,直到调度了一个处理程序,或者
* 直到io_context已经停止。
*
* @return 已执行的处理程序数。零返回值
* 意味着io_context对象已停止(stopped()函数
* 返回@ctrue)。随后调用run()、run_one()、poll()或
* poll_one()将立即返回,除非之前有对的调用
* restart().
*
* @note 从当前正在运行的线程调用run_one()函数
* 调用run()、run_one()、run_for()、run_until()、poll()或
* 对同一io_context对象的poll_one()可能会引入
* 死锁。避免这种情况是呼叫方的责任。
*/
ASIO_DECL count_type run_one();
#if !defined(ASIO_NO_DEPRECATED)
/// (不推荐:使用nonerror_code overlaod。)运行io_context对象的
///事件处理循环最多执行一个处理程序。
/**
*run_one()函数会阻塞,直到调度了一个处理程序,或者
*直到io_context已经停止。
*
* @return 已执行的处理程序数。零返回值
* 意味着io_context对象已停止(stopped()函数
* 返回@ctrue)。随后调用run()、run_one()、poll()或
* poll_one()将立即返回,除非之前有对的调用
* restart().
*
* @return 已执行的处理程序数。
*
* @note 从当前正在运行的线程调用run_one()函数
* 调用run()、run_one()、run_for()、run_until()、poll()或
* 对同一io_context对象的poll_one()可能会引入
* 死锁。避免这种情况是呼叫方的责任。
*/
ASIO_DECL count_type run_one(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)
#if defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)
/// 在指定的持续时间内运行io_context对象的事件处理循环
/// 最多执行一个处理程序。
/**
*函数run_one_for()阻塞,直到调度了一个处理程序,
*直到io_context停止,或者直到指定的持续时间
*经过。
*
* @param rel_time 呼叫可能被阻止的持续时间。
*
* @return 已执行的处理程序数。
*/
template <typename Rep, typename Period>
std::size_t run_one_for(const chrono::duration<Rep, Period>& rel_time);
///运行io_context对象的事件处理循环,直到指定时间
///最多执行一个处理程序。
/**
* run_one_until()函数阻塞,直到调度了一个处理程序,
* 直到io_context停止,或者直到指定的时间
* 已联系到。
*
* @param abs_time 呼叫可能被阻塞的时间点。
*
* @return 已执行的处理程序数。
*/
template <typename Clock, typename Duration>
std::size_t run_one_until(
const chrono::time_point<Clock, Duration>& abs_time);
#endif // defined(ASIO_HAS_CHRONO) || defined(GENERATING_DOCUMENTATION)
/// 运行io_context对象的事件处理循环以准备执行
/// 处理程序。
/**
* poll()函数运行的处理程序可以在没有阻塞的情况下运行,
* 直到io_context已经停止或者没有更多准备好的处理程序。
*
* @return 已执行的处理程序数。
*/
ASIO_DECL count_type poll();
#if !defined(ASIO_NO_DEPRECATED)
///(不推荐:使用非error_code重载。)运行io_context对象的
/// 用于执行就绪处理程序的事件处理循环。
/**
* poll()函数运行的处理程序可以在没有阻塞的情况下运行,
* 直到io_context已经停止或者没有更多准备好的处理程序。
*
* @param ec 设置为指示发生了什么错误(如果有)。
*
* @return 已执行的处理程序数。
*/
ASIO_DECL count_type poll(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)
/// 运行io_context对象的事件处理循环以执行一个就绪的
/// 处理程序。
/**
* poll_one()函数最多运行一个准备运行的处理程序,
* 没有阻塞。
*
* @return 已执行的处理程序数。
*/
ASIO_DECL count_type poll_one();
#if !defined(ASIO_NO_DEPRECATED)
///(不推荐:使用非error_code重载。)运行io_context对象的
/// 事件处理循环以执行一个就绪的处理程序。
/**
* poll_one()函数最多运行一个准备运行的处理程序,
* 没有阻塞。
*
* @param ec 设置为指示发生了什么错误(如果有)。
*
* @return 已执行的处理程序数。
*/
ASIO_DECL count_type poll_one(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)
/// 停止io_context对象的事件处理循环。
/**
* 此函数不阻塞,而是简单地向
* 停止。所有对其run()或run_one()成员函数的调用都应该
* 尽快返回。对run()、run_one()、poll()的后续调用
* 或者poll_one()将立即返回,直到调用restart()为止。
*/
ASIO_DECL void stop();
/// 确定io_context对象是否已停止。
/**
* 此函数用于确定io_context对象是否已
* 已停止,要么是通过对stop()的显式调用,要么是由于已用完
* 工作。当io_context对象停止时,调用run()、run_one(),
* poll()或poll_one()将立即返回,而不调用任何
* 处理程序。
*
* @return 如果io_context对象已停止,则为@c true,否则为@c false。
*/
ASIO_DECL bool stopped() const;
/// 重新启动io_context,为后续的run()调用做准备。
/**
* 此函数必须在任何第二组或以后的
* 当
* 之前对这些函数的调用由于io_context而返回
* 被停止工作或失业。调用restart()后
* io_context对象的stopped()函数将返回@c false。
*
* 当有任何未完成的调用时,不得调用此函数
* run()、run_one()、poll()或poll_one()函数。
*/
ASIO_DECL void restart();
#if !defined(ASIO_NO_DEPRECATED)
///(已弃用:请使用restart()。)重置io_context以准备
///随后的run()调用。
/**
* 此函数必须在任何第二组或以后的
* 当
* 之前对这些函数的调用由于io_context而返回
* 被停止工作或失业。调用restart()后
* io_context对象的stopped()函数将返回@c false。
*
* 当有任何未完成的调用时,不得调用此函数
* run()、run_one()、poll()或poll_one()函数。
*/
void reset();
/// (已弃用:使用asio::dispatch().)向请求io_context
/// 调用给定的处理程序。
/**
* 此函数用于要求io_context执行给定的处理程序。
*
* io_context保证只在线程中调用处理程序
* 其中run()、run_one()、poll()或poll_one()成员函数
* 当前正在调用。处理程序可以在此函数内执行
* 如果保证能够得到满足。
*
* @param handler要调用的处理程序。io_context将使
* 根据需要提供处理程序对象的副本。的函数签名
* 处理程序必须是:@code void handler()@结束代码
*
* @note 只有在以下情况下,此函数才会引发异常:
*
* @li 处理程序的@casio_handler_allocate函数;或
*
* @li 处理程序的复制构造函数
*
* 引发异常。
*/
template <typename LegacyCompletionHandler>
ASIO_INITFN_RESULT_TYPE(LegacyCompletionHandler, void ())
dispatch(ASIO_MOVE_ARG(LegacyCompletionHandler) handler);
///(已弃用:使用asio::post()。)请求要调用的io_context
/// 给定的处理程序并立即返回。
/**
* 该函数用于要求io_ text执行给定的处理程序,
* 但不允许io_context从内部调用处理程序
* 功能。
*
* io_context保证只在线程中调用处理程序
* 其中run()、run_one()、poll()或poll_one()成员函数
* 当前正在调用。
*
* @param handler要调用的处理程序。io_context将使
* 根据需要提供处理程序对象的副本。的函数签名
* 处理程序必须是:@code void handler()@结束代码
*
* @note 只有在以下情况下,此函数才会引发异常:
*
* @li 处理程序的@casio_handler_allocate函数;或
*
* @li 处理程序的复制构造函数
*
* 引发异常。
*/
template <typename LegacyCompletionHandler>
ASIO_INITFN_RESULT_TYPE(LegacyCompletionHandler, void ())
post(ASIO_MOVE_ARG(LegacyCompletionHandler) handler);
///(已弃用:请使用asio::bind_executor()。)创建一个新的处理程序
/// 自动在io_context上分派包装的处理程序。
/**
* 此函数用于创建一个新的处理程序函数对象,当
* 调用时,将自动将包装的处理程序传递给io_context
* 对象的调度函数。
*
* @param handler要包装的处理程序。io_context将进行复制
* 处理程序对象的。处理程序的函数签名
* 必须是:@code void handler(A1 A1,…An An)@结束代码
*
* @return 一个函数对象,当被调用时,它将包装的处理程序传递给
* io_context对象的调度函数。给定一个函数对象
* 签名:
* @code R f(A1 a1, ... An an); @endcode
* 如果将此函数对象传递给wrap函数,如下所示:
* @code io_context.wrap(f); @endcode
* 则返回值是具有签名的函数对象
* @code void g(A1 a1, ... An an); @endcode
* 当被调用时,执行等效于以下内容的代码:
* @code io_context.dispatch(boost::bind(f, a1, ... an)); @endcode
*/
template <typename Handler>
#if defined(GENERATING_DOCUMENTATION)
unspecified
#else
detail::wrapped_handler<io_context&, Handler>
#endif
wrap(Handler handler);
#endif // !defined(ASIO_NO_DEPRECATED)
private:
// 用于添加实现的Helper函数。
ASIO_DECL impl_type& add_impl(impl_type* impl);
// Backwards compatible overload for use with services derived from
// io_context::service.
template <typename Service>
friend Service& use_service(io_context& ioc);
#if defined(ASIO_WINDOWS) || defined(__CYGWIN__)
detail::winsock_init<> init_;
#elif defined(__sun) || defined(__QNX__) || defined(__hpux) || defined(_AIX) \
|| defined(__osf__)
detail::signal_init<> init_;
#endif
// The implementation.
impl_type& impl_;
};
impl_变量声明追踪
// The implementation.
impl_type& impl_;
跳转impl_type类声明
class io_context
: public execution_context
{
private:
typedef detail::io_context_impl impl_type;
#if defined(ASIO_HAS_IOCP)
friend class detail::win_iocp_overlapped_ptr;
#endif
跳转detail::io_context_impl类声明
namespace detail {
#if defined(ASIO_HAS_IOCP)
typedef class win_iocp_io_context io_context_impl;
class win_iocp_overlapped_ptr;
#else
typedef class scheduler io_context_impl;
#endif
} // namespace detail
追踪impl_变量到这里,我们可以知道,impl_参数其实是类scheduler
下面主要研究下类scheduler
先看scheduler类声明
struct scheduler_thread_info;
class scheduler
: public execution_context_service_base<scheduler>,
public thread_context
{
public:
typedef scheduler_operation operation;
//构造函数。指定可能发生以下情况的并发线程数
//运行调度程序。如果设置为1,则执行某些优化。
ASIO_DECL scheduler(asio::execution_context& ctx,
int concurrency_hint = 0);
// 销毁服务所拥有的所有用户定义的处理程序对象。
ASIO_DECL void shutdown();
// 如果需要,初始化任务。
ASIO_DECL void init_task();
// 运行事件循环,直到中断或没有更多工作为止。
ASIO_DECL std::size_t run(asio::error_code& ec);
// 运行直到中断或执行一个操作。
ASIO_DECL std::size_t run_one(asio::error_code& ec);
// 运行直到超时、中断或执行一个操作。
ASIO_DECL std::size_t wait_one(
long usec, asio::error_code& ec);
// 轮询操作而不阻塞。
ASIO_DECL std::size_t poll(asio::error_code& ec);
// 轮询一个操作而不阻塞。
ASIO_DECL std::size_t poll_one(asio::error_code& ec);
// 中断事件处理循环。
ASIO_DECL void stop();
// 确定计划程序是否已停止。
ASIO_DECL bool stopped() const;
// 重新启动,为后续的运行调用做准备
ASIO_DECL void restart();
// 通知某些工作已开始。
void work_started()
{
++outstanding_work_;
}
//用于补偿即将到来的work_finished调用。必须调用
//来自调度程序拥有的线程。
ASIO_DECL void compensating_work_started();
// 通知某些工作已经完成。
void work_finished()
{
if (--outstanding_work_ == 0)
stop();
}
// 返回是否可以立即调度处理程序。
bool can_dispatch()
{
return thread_call_stack::contains(this) != 0;
}
//请求调用给定的操作并立即返回。假设
//尚未为该操作调用work_started()。
ASIO_DECL void post_immediate_completion(
operation* op, bool is_continuation);
//请求调用给定的操作并立即返回。假设
//之前为该操作调用了work_started()。
ASIO_DECL void post_deferred_completion(operation* op);
//请求调用给定的操作并立即返回。假设
//之前为每个操作调用了work_started()。
ASIO_DECL void post_deferred_completions(op_queue<operation>& ops);
//在尝试调度失败后,将给定的操作排队
//用于立即调用的操作。
ASIO_DECL void do_dispatch(operation* op);
//将未完成的操作作为停机操作的一部分进行处理。假设
//work_started()之前是为操作调用的。
ASIO_DECL void abandon_operations(op_queue<operation>& ops);
// 获取用于初始化调度程序的并发提示。
int concurrency_hint() const
{
return concurrency_hint_;
}
private:
// The mutex type used by this scheduler.此计划程序使用的互斥对象类型。
typedef conditionally_enabled_mutex mutex;
// The event type used by this scheduler.此计划程序使用的事件类型。
typedef conditionally_enabled_event event;
// Structure containing thread-specific data.包含线程特定数据的结构。
typedef scheduler_thread_info thread_info;
// 最多运行一个操作。可能会阻塞。
ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,
thread_info& this_thread, const asio::error_code& ec);
// 在超时的情况下最多运行一个操作。可能会阻塞。
ASIO_DECL std::size_t do_wait_one(mutex::scoped_lock& lock,
thread_info& this_thread, long usec, const asio::error_code& ec);
// 最多轮询一个操作。
ASIO_DECL std::size_t do_poll_one(mutex::scoped_lock& lock,
thread_info& this_thread, const asio::error_code& ec);
// 停止任务和所有空闲线程。
ASIO_DECL void stop_all_threads(mutex::scoped_lock& lock);
// 唤醒单个空闲线程或任务,并始终解锁互斥锁。
ASIO_DECL void wake_one_thread_and_unlock(
mutex::scoped_lock& lock);
// 帮助程序类,用于在块退出时执行与任务相关的操作。
struct task_cleanup;
friend struct task_cleanup;
// 在块退出时调用与工作相关的操作的帮助程序类。
struct work_cleanup;
friend struct work_cleanup;
// 是否针对单线程用例进行优化。
const bool one_thread_;
// 锁
mutable mutex mutex_;
// 唤醒被阻止线程的事件。
event wakeup_event_;
// 此服务要运行的任务。
reactor* task_;
// 操作对象,用于表示任务在队列中的位置。
struct task_operation : operation
{
task_operation() : operation(0) {}
} task_operation_;
// 任务中断标识
bool task_interrupted_;
// 原子操作,未完成工作的数量。
atomic_count outstanding_work_;
// 任务队列
op_queue<operation> op_queue_;
// 停止标识
bool stopped_;
// 关闭标识
bool shutdown_;
// 用于初始化调度程序的并发提示
const int concurrency_hint_;
};
1、最开始我们看到io_context的构造函数
io_context::io_context()
: impl_(add_impl(new impl_type(*this, ASIO_CONCURRENCY_HINT_DEFAULT)))
{
}
// 添加服务
io_context::impl_type& io_context::add_impl(io_context::impl_type* impl)
{
asio::detail::scoped_ptr<impl_type> scoped_impl(impl);
asio::add_service<impl_type>(*this, scoped_impl.get());
return *scoped_impl.release();
}
2、而impl_type我们追踪到是scheduler类,如此,再看看scheduler类对应构造函数
//构造函数。指定可能发生以下情况的并发线程数
//运行调度程序。如果设置为1,则执行某些优化。
ASIO_DECL scheduler(asio::execution_context& ctx,
int concurrency_hint = 0);
scheduler::scheduler(
asio::execution_context& ctx, int concurrency_hint)
: asio::detail::execution_context_service_base<scheduler>(ctx),
one_thread_(concurrency_hint == 1
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_IO, concurrency_hint)),
mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)),
task_(0),
task_interrupted_(true),
outstanding_work_(0),
stopped_(false),
shutdown_(false),
concurrency_hint_(concurrency_hint)
{
ASIO_HANDLER_TRACKING_INIT;
}
3、one_thread_
one_thread_(concurrency_hint == 1
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)
|| !ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_IO, concurrency_hint))
...
// 是否针对单线程用例进行优化。
const bool one_thread_;
4、mutex_
mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
SCHEDULER, concurrency_hint)),
5、task_追踪
// 此服务要运行的任务。
reactor* task_;
...
typedef class epoll_reactor reactor;
...
epoll_reactor类初始化
// Constructor.
ASIO_DECL epoll_reactor(asio::execution_context& ctx);
6、ASIO_DECL epoll_reactor(asio::execution_context& ctx);剖析
epoll_reactor::epoll_reactor(asio::execution_context& ctx)
: execution_context_service_base<epoll_reactor>(ctx),
scheduler_(use_service<scheduler>(ctx)),
mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING(
REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
interrupter_(),
epoll_fd_(do_epoll_create()),
timer_fd_(do_timerfd_create()),
shutdown_(false),
registered_descriptors_mutex_(mutex_.enabled())
{
// Add the interrupter's descriptor to epoll.
epoll_event ev = { 0, { 0 } };
ev.events = EPOLLIN | EPOLLERR | EPOLLET;
ev.data.ptr = &interrupter_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
interrupter_.interrupt();
// Add the timer descriptor to epoll.
if (timer_fd_ != -1)
{
ev.events = EPOLLIN | EPOLLERR;
ev.data.ptr = &timer_fd_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
}
}
这里可以看到epoll初始化调用流程,对epoll进行初始化。
scheduler类主要研究下run()函数,由于在ubuntu系统上,我们将会看到epoll之类的代码。
函数声明
// 运行事件循环,直到中断或没有更多工作为止。
ASIO_DECL std::size_t run(asio::error_code& ec);
实现
std::size_t scheduler::run(asio::error_code& ec)
{
ec = asio::error_code();
// outstanding_work_原子操作,未完成工作的数量。
// 当outstanding_work_为0,程序没有要完成的工作,停止退出,这里的退出指asio的io操作被停止,直到调用restart()恢复使用为止。
if (outstanding_work_ == 0)
{
stop();
return 0;
}
// 绑定线程操作
thread_info this_thread;
this_thread.private_outstanding_work = 0;
thread_call_stack::context ctx(this, this_thread);
// 加锁
mutex::scoped_lock lock(mutex_);
// do_run_one返回1为真,lock解锁,运行循环,n++;
// do_run_one返回0为假,退出循环,函数结束
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
if (outstanding_work_ == 0)
{
stop();
return 0;
}
// 绑定线程操作
thread_info this_thread;
this_thread.private_outstanding_work = 0;
thread_call_stack::context ctx(this, this_thread);
// 加锁
mutex::scoped_lock lock(mutex_);
thread_info 为每个线程的独享变量,内部有private_op_queue, 用于临时存储触发事件的descriptor_state,以及完成io读写的reactor_op
struct scheduler_thread_info : public thread_info_base
{
op_queue<scheduler_operation> private_op_queue;
long private_outstanding_work;
};
context 为上下文,以链表形式存储context对象,context 的 id为schedule对象的地址,value为this_info , 所以可以根据一个io_context 找到它所有的thread_info,也即可以很容易却修改不同线程的局部operation队列 private_op_queue。
class context
: private noncopyable
{
public:
// Push the key on to the stack.
explicit context(Key* k)
: key_(k),
next_(call_stack<Key, Value>::top_)
{
value_ = reinterpret_cast<unsigned char*>(this);
call_stack<Key, Value>::top_ = this;
}
// Push the key/value pair on to the stack.
context(Key* k, Value& v)
: key_(k),
value_(&v),
next_(call_stack<Key, Value>::top_)
{
call_stack<Key, Value>::top_ = this;
}
// Pop the key/value pair from the stack.
~context()
{
call_stack<Key, Value>::top_ = next_;
}
// Find the next context with the same key.
Value* next_by_key() const
{
context* elem = next_;
while (elem)
{
if (elem->key_ == key_)
return elem->value_;
elem = elem->next_;
}
return 0;
}
private:
friend class call_stack<Key, Value>;
// The key associated with the context.
Key* key_;
// The value associated with the context.
Value* value_;
// The next element in the stack.
context* next_;
};
// do_run_one返回1为真,lock解锁,运行循环,n++;
// do_run_one返回0为假,退出循环,函数结束
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
声明
// 最多运行一个操作。可能会阻塞。
ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,
thread_info& this_thread, const asio::error_code& ec);
实现
std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
scheduler::thread_info& this_thread,
const asio::error_code& ec)
{
// stopped_结束标识
while (!stopped_)
{
// 不断读取op_queue_队列,队列有数据则处理
if (!op_queue_.empty())
{
// 取出对列头节点,并移除
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
// 如果o对列头节点为最后一个节点
if (o == &task_operation_)
{
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit; // (void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
}
else
{
std::size_t task_result = o->task_result_;
if (more_handlers && !one_thread_)
wake_one_thread_and_unlock(lock);
else
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
o->complete(this, ec, task_result);
return 1;
}
}
else
{
wakeup_event_.clear(lock);
wakeup_event_.wait(lock);
}
}
return 0;
}
1、stopped_结束标识,判断函数是否结束
while (!stopped_)
2、op_queue_任务队列,不断循环处理,判断队列是否有元素
// 不断读取op_queue_队列,队列有数据则处理
if (!op_queue_.empty())
3、op_queue_任务队列如果为空,重置事件并等待事件发出信号。
else
{
wakeup_event_.clear(lock); // 重置事件。
wakeup_event_.wait(lock); // 等待事件发出信号。
}
4、op_queue_任务队列如果不为空,先取出队列元素
// 取出对列头节点,并移除
operation* o = op_queue_.front();
op_queue_.pop();
bool more_handlers = (!op_queue_.empty());
5、然后对该元素判断,是否为最后个元素,如果不是,则完成操作,删除对象。
if (o == &task_operation_)
{
...
}
else
{
std::size_t task_result = o->task_result_;
if (more_handlers && !one_thread_)
wake_one_thread_and_unlock(lock);
else
lock.unlock();
// 确保在块退出时减少未完成工作的计数。
work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// 完成操作。可能引发异常。删除对象。
o->complete(this, ec, task_result);
return 1;
}
6、o->complete追踪
o->complete(this, ec, task_result);
...
void complete(void* owner, const asio::error_code& ec,
std::size_t bytes_transferred)
{
func_(owner, this, ec, bytes_transferred);
}
...
func_type func_;
...
typedef void (*func_type)(void*,
scheduler_operation*,
const asio::error_code&, std::size_t);
从op中获取执行结果(注意io操作,第一次结果为触发事件类型,第二次为io操作结果)
调用op的完成函数(第一次为根据返回事件类型读写数据,第二次为将读写的结果传给用户回调)
7、op_queue_取出的元素,如果是最后一个元素
if (o == &task_operation_)
{
// task_interrupted_任务中断标识
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit; // (void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
}
else
{
...
}
8、将more_handlers赋值给task_interrupted_任务中断标识,more_handlers由判断前获取
bool more_handlers = (!op_queue_.empty());
解锁
if (more_handlers && !one_thread_)
wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
9、task_cleanup类只有一个析构函数,作用是将已完成的操作排队,并在结束时重新插入任务操作队列。
task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
(void)on_exit防止编译告警,未使用函数的参数,编译期正常来说会抛出警告(warring),而在部分场景下,我们这么做可能是为了扩展等等。那么又不想看到这么多warring。
10、task->run剖析
task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
task->run函数追踪
// 此服务要运行的任务。
reactor* task_;
...
#if defined(ASIO_HAS_IOCP) || defined(ASIO_WINDOWS_RUNTIME)
typedef class null_reactor reactor;
#elif defined(ASIO_HAS_IOCP)
typedef class select_reactor reactor;
#elif defined(ASIO_HAS_EPOLL)
typedef class epoll_reactor reactor;
#elif defined(ASIO_HAS_KQUEUE)
typedef class kqueue_reactor reactor;
#elif defined(ASIO_HAS_DEV_POLL)
typedef class dev_poll_reactor reactor;
#else
typedef class select_reactor reactor;
#endif
// 这里的reactor是typedef class epoll_reactor reactor;
...
task->run函数
void epoll_reactor::run(long usec, op_queue<operation>& ops)
{
int timeout;
if (usec == 0)
timeout = 0;
else
{
timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
if (timer_fd_ == -1)
{
mutex::scoped_lock lock(mutex_);
timeout = get_timeout(timeout);
}
}
// Block on the epoll descriptor.
epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
#if defined(ASIO_HAS_TIMERFD)
bool check_timers = (timer_fd_ == -1);
#else // defined(ASIO_HAS_TIMERFD)
bool check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)
// Dispatch the waiting events.
for (int i = 0; i < num_events; ++i)
{
void* ptr = events[i].data.ptr;
if (ptr == &interrupter_)
{
#if defined(ASIO_HAS_TIMERFD)
if (timer_fd_ == -1)
check_timers = true;
#else // defined(ASIO_HAS_TIMERFD)
check_timers = true;
#endif // defined(ASIO_HAS_TIMERFD)
}
#if defined(ASIO_HAS_TIMERFD)
else if (ptr == &timer_fd_)
{
check_timers = true;
}
#endif // defined(ASIO_HAS_TIMERFD)
else
{
// The descriptor operation doesn't count as work in and of itself, so we
// don't call work_started() here. This still allows the scheduler to
// stop if the only remaining operations are descriptor operations.
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
if (!ops.is_enqueued(descriptor_data))
{
descriptor_data->set_ready_events(events[i].events);
ops.push(descriptor_data);
}
else
{
descriptor_data->add_ready_events(events[i].events);
}
}
}
if (check_timers)
{
mutex::scoped_lock common_lock(mutex_);
timer_queues_.get_ready_timers(ops);
#if defined(ASIO_HAS_TIMERFD)
if (timer_fd_ != -1)
{
itimerspec new_timeout;
itimerspec old_timeout;
int flags = get_timeout(new_timeout);
timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
}
#endif // defined(ASIO_HAS_TIMERFD)
}
}
这里的run函数,看代码直观了解到,是调用epoll的逻辑
epoll_wait
// Block on the epoll descriptor.
epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
对num_events的响应处理,塞入我们传入ops队列
// Dispatch the waiting events.
for (int i = 0; i < num_events; ++i)
{
...
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
if (!ops.is_enqueued(descriptor_data))
{
// 读写
descriptor_data->set_ready_events(events[i].events);
ops.push(descriptor_data);
}
else
{
// 新加入链接
descriptor_data->add_ready_events(events[i].events);
}
}
run()封装了epoll,并将触发了事件的文件描述符fd绑定的descriptor_state,传给thread_info的private_op_queue,本次循环结束后添加到op_queue中。
这里再返回task_->run函数,我们对scheduler::do_run_one函数剖析完成,scheduler::run函数也是。
参考链接:https://blog.csdn.net/qq_38166063/article/details/124278707