nodeos(10265 主线程&信号处理线程) : 进行异步io投递 `epoll_wait` | 接收系统信号并处理
─┬─{nodeos}(10311 controller线程池_1: 异步执行块block_state创建,块中交易验证时的交易解签名计算)
├─{nodeos}(10312 controller线程池_2)
├─{nodeos}(10314 producer_plugin线程池_1: 异步执行交易解签名计算)
├─{nodeos}(10315 producer_plugin线程池_2)
├─{nodeos}(10317 http_plugin线程池_1: http服务)
├─{nodeos}(10318 http_plugin线程池_2)
└─{nodeos}(10319 net_plugin线程池_1: p2p服务)
主线程:main
函数启动线程,该线程执行完程序初始化工作后,会调用app().io_service.run()
, 启动boost::asio::io_service
的异步io服务,通过异步io方式完成节点块生产,交易处理等主要业务工作。
void application::exec() {
boost::asio::io_service::work work(*io_serv);
(void)work;
bool more = true;
while( more || io_serv->run_one() ) {
while( io_serv->poll_one() ) {}
// execute the highest priority item
more = pri_queue.execute_highest();
}
shutdown(); /// perform synchronous shutdown
io_serv.reset();
}
eos中交易不支持并行处理, 不允许在除主线程之外的其它线程中重复执行io_serv.run()操作
boost::asio::io_service& get_io_service() { return *io_serv; }
producer_plugin::producer_plugin()
: my(new producer_plugin_impl(app().get_io_service())){ // 在构造时传入 app().get_io_service()
my->_self = this;
}
producer_plugin_impl(boost::asio::io_service& io)
:_timer(io)
信号处理线程:子线程,通过异步io服务,接收系统信号并处理。
boost::asio::io_service startup_thread_ios; //信号处理线程
setup_signal_handling_on_ios(startup_thread_ios);
std::thread startup_thread([&startup_thread_ios]() {
startup_thread_ios.run();
});
void application::setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup)
线程池,线程池启动的工作线程数可通过启动参数配置。 (默认池内2条工作线程)
struct controller_impl {
...
boost::asio::thread_pool thread_pool; //线程池
}
controller_impl( const controller::config& cfg, controller& s ){
...
thread_pool( cfg.thread_pool_size )
}
controller
线程池:异步执行块block_state
创建,块中交易验证时的交易解签名计算。
执行块相关操作时,较耗时且与排序无关的动作都会投递到controller线程池执行。
块交易验证时的解签名操作 (节点收到块)
// 节点收到块,会调用apply_block() 函数执行块中交易,进行块验证
void apply_block( const block_state_ptr& bsp, controller::block_status s ) {
...
transaction_metadata::start_recover_keys( mtrx, thread_pool, chain_id, microseconds::maximum() );
}
//在线程池`thread_pool`中异步执行解签名函数,异步解签名的执行结果,会放入交易的 `signing_keys_future` 中
//真正解签名算法是 trn.get_signature_keys(),解出的公钥放到recovered_pub_keys中。
signing_keys_future_type transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::thread_pool& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit )
块状态block_state
数据创建操作(两轮共识计算都在这里完成)
producer_plugin
线程池:负责异步执行交易解签名计算。
void producer_plugin::plugin_initialize(const boost::program_options::variables_map& options){
...
my->_thread_pool.emplace( thread_pool_size );
}
为接收到的投递交易进行异步解签名计算(在节点收到其它节点/客户端广播的交易时被调用)
void on_incoming_transaction_async(const transaction_metadata_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
signing_keys_future_type future = transaction_metadata::start_recover_keys( trx, _thread_pool->get_executor(),
chain.get_chain_id(), fc::microseconds( cfg.max_transaction_cpu_usage ) );
}
等待解签名计算完成,将交易投递到主线程的异步io服务中处理。
boost::asio::post( *_thread_pool, [self = this, future, trx, persist_until_expired, next]() {
if( future.valid() )
future.wait();
app().post(priority::low, [self, trx, persist_until_expired, next]() {
self->process_incoming_transaction_async( trx, persist_until_expired, next );
});
});
http_plugin
线程池: http 的 response 修改成多线程
optional<boost::asio::thread_pool> thread_pool
net_plugin
线程池
optional<boost::asio::thread_pool> thread_pool
boost::asio
EOS 异步框架是通过实现一个任务队列, 然后往队列中 post 任务来实现, 而消费者不停地从中提取 task 并且执行。