当前位置: 首页 > 工具软件 > Reactor-Guice > 使用案例 >

ACE_Reactor(六)ACE_TP_Reactor

拓拔高畅
2023-12-01

源码可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找

虽然ACE_Select_Reactor是灵活的,但是由于只有拥有者才能调用它的handle_events方法,所以在多线程应用程序中它还是有一定的局限性的,因而ACE_Select_Reactor得事件多路分离层的序列化处理可能对某些场景来讲约束太多了。一种解决方法是多个线程去运行独立ACE_Select_Reactor的实例,但是这样需要开发人员去实现一个代理在各反应器之间均衡的分配事件处理,以便达到均衡负载。另一种即这里的ACE_TP_Reactor,基于线程池的Reactor。
其实现了Learder/Follower模型,从而提供了一种有效的并发性模型,多个线程可以针对一组I/O句柄轮流调用select去检测并处理事件。

最关键的地方在于:
1.分派事件(其实就是该线程自己执行事件处理函数)前,会释放Token,让其他线程能够去select或者处理还未处理的事件
2.获取到token的线程,会先检测是否有未处理的事件,若有则先处理,若没有则占用token进行select阻塞检测事件

这里的keypoint1,可以从如下代码中看出来:


int ACE_TP_Reactor::handle_socket_events    (   int &   event_count,
ACE_TP_Token_Guard &    g
)
00466 {
00467 
00468   // We got the lock, lets handle some I/O events.
00469   ACE_EH_Dispatch_Info dispatch_info;
00470 
00471   this->get_socket_event_info (dispatch_info);
00472 
00473   // If there is any event handler that is ready to be dispatched, the
00474   // dispatch information is recorded in dispatch_info.
00475   if (!dispatch_info.dispatch ())
00476     {
00477       return 0;
00478     }
00479 
00480   // Suspend the handler so that other threads don't start dispatching
00481   // it, if we can't suspend then return directly
00482   //
00483   // NOTE: This check was performed in older versions of the
00484   // TP_Reactor. Looks like it is a waste..
00485   if (dispatch_info.event_handler_ != this->notify_handler_)
00486     if (this->suspend_i (dispatch_info.handle_) == -1)
00487       return 0;
00488 
00489   // Call add_reference() if needed.
00490   if (dispatch_info.reference_counting_required_)
00491     dispatch_info.event_handler_->add_reference ();
00492 
00493   // Release the lock.  Others threads can start waiting.
00494   guard.release_token ();
00495 
00496   int result = 0;
00497 
00498   // If there was an event handler ready, dispatch it.
00499   // Decrement the event left
00500   --event_count;
00501 
00502   // Dispatched an event
00503   if (this->dispatch_socket_event (dispatch_info) == 0)
00504     ++result;
00505 
00506   return result;
00507 }

上述的第一点,也即是这里的:

00493   // Release the lock.  Others threads can start waiting.
00494   guard.release_token ();

说明里也讲的很清楚,会将token释放让其他线程去等待。

那么keypoint2,又是如何完成的了呢?看下面的代码:

int ACE_TP_Reactor::get_event_for_dispatching   (   ACE_Time_Value *    max_wait_time    ) 
00511 {
00512   // If the reactor handler state has changed, clear any remembered
00513   // ready bits and re-scan from the master wait_set.
        .............
00534   return this->wait_for_multiple_events (this->ready_set_,
00535                                          max_wait_time);
00536 }
//看下wait_for_multiple_events的实现

template<class ACE_SELECT_REACTOR_TOKEN>
int ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::wait_for_multiple_events  (   ACE_Select_Reactor_Handle_Set &     ,
ACE_Time_Value *    
) 
01074 {
01075   ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events");
01076   u_long width = 0;
01077   ACE_Time_Value timer_buf (0);
01078   ACE_Time_Value *this_timeout;
01079 
01080   int number_of_active_handles = this->any_ready (dispatch_set);
01081 
01082   // If there are any bits enabled in the <ready_set_> then we'll
01083   // handle those first, otherwise we'll block in <select>.
01084 
01085   if (number_of_active_handles == 0)
01086     {
01087       do
01088         {
01089           this_timeout =
01090             this->timer_queue_->calculate_timeout (max_wait_time,
01091                                                    &timer_buf);
01092           width = (u_long) this->handler_rep_.max_handlep1 ();
01093 
01094           dispatch_set.rd_mask_ = this->wait_set_.rd_mask_;
01095           dispatch_set.wr_mask_ = this->wait_set_.wr_mask_;
01096           dispatch_set.ex_mask_ = this->wait_set_.ex_mask_;
01097           number_of_active_handles = ACE_OS::select (int (width),
01098                                                      dispatch_set.rd_mask_,
01099                                                      dispatch_set.wr_mask_,
01100                                                      dispatch_set.ex_mask_,
01101                                                      this_timeout);
01102         }
..........

这里的实现ACE_TP_Reactor是直接使用的ACE_Select_Reactor_T模板类的,但是其处理也是这样的,先检测是否已经有触发的事件any_ready ,若有则不再去执行select。

Perform Double-Checked Locking Optimization.?

 类似资料: