呼,整个libco库终于快是要剖析完了。整个人算是对协程以及操作系统的调度有了新的认识。
之前的博客,我们分析了libco的协程从创建到启动,挂起以及最后退出的一个过程。同时,我们也认识到,协程本质的执行是串行的。
在之前协程的安装与使用中,我们提到了一个生产者消费者例子。在producer协程函数中,最后会调用poll函数等待一秒,comsumer函数也会调用co_cond_timedwait
函数去等待生产者信号。这在我们使用者看来是同步阻塞的,但是分析源码的时候,我发现它其实是同步非阻塞的。
为什么这么说呢?因为从协程的角度来看,当前的协程阻塞了,但是它地下的线程可能正在执行别的函数。而Linux可是没有协程概念的,所以其就是非阻塞的。
还记得我在从协程的使用来理清关系一文中提到的主协程的概念么?我们提到,libco程序都有一个主协程,即程序里首次调用co_create
显式创建的第一个协程。在例子中,其实就是那个调用co_eventloop
的函数。producer和comsumer在阻塞之后,CPU会被yield给主协程,而此时主协程在co_eventloop
函数中负责运行整个libco协程调度。
之前分析了协程的挂起和执行做了什么,但是我们还没有说什么时候会发生协程的挂起和恢复执行。
对yield来说,有以下三种情况:
co_yield_env
poll
或者co_cond_timedwait
陷入阻塞状态connect
、read
、write
等系统调用陷入阻塞状态与之对应的resume也有三种状态:
co_resume
poll
的目标文件描述符事件就绪或超时,co_cond_timedwait
等到了其他协程的co_cond_signal通知信号read
,write
等I/O接口成功读到或写入数据注意:
这些阻塞都是在用户态的,其底层所处线程并没有阻塞,而是在运行其他协程。而为了实现用户态阻塞,避免用户态阻塞,我们就必须得依靠内核提供的非阻塞I/O机制,将socket文件描述符设置成non-blocking的。
libco通过dlsym的hook了各种网络I/O相关的系统调用。使用户可以以同步的方式直接使用read、write、connect等系统调用。
以read为例:
当我们调用read去读取数据的时候,由于系统的read已经被hook,所以实际上会调用到libco内部准备好的read函数,这个函数其中做了四件事:
- 将当前协程注册到定时器上,用于将来处理读超时
- 调用epoll_ctl将自己注册到当前执行环境的epoll实例上
- 调用co_yield_env让出CPU
- 等到协程被主函数唤醒之后,也就是读好数据了,会调用真正的read系统调用
假如说,协程yield让出CPU控制权回到了主协程,主协程此时在干嘛呢?
主协程在co_eventloop
中,周而复始的调用epoll_wait
,当有就绪的I/O事件就处理I/O事件,当定时器上有超时的事件就处理超时事件,活跃队列中有活跃事件就处理活跃事件。
那么我们就来看看co_eventloop的源码,看看它究竟做了哪些工作吧:
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
//给返回的结果分配内存
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
//epoll_wait
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
//得到活跃队列和超时队列
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
//超时队列清空,初始化
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
//处理有结果的描述符
for(int i=0;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )
{
//执行预处理回调函数,之后会将这个连接放到timeout队列中
item->pfnPrepare( item,result->events[i],active );
}
else
{
//放入timeout队列
AddTail( active,item );
}
}
//得到当前的时间
unsigned long long now = GetTickMS();
//队列当前时间和之前的预设时间,将超时的放入timeout队列
TakeAllTimeout( ctx->pTimeout,now,timeout );
//设置超时标志
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
//active和timeout队列合并
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
//获取队头
lp = active->head;
while( lp )
{
//连接队列的队头出队
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime)
{
//处理timeout的item
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if( lp->pfnProcess )
{
//遍历active队列,调用工作协程设置的回调函数并resume挂起的工作协程
lp->pfnProcess( lp );
}
lp = active->head;
}
if( pfn )
{
//调用主协程传递进来的回调函数
if( -1 == pfn( arg ) )
{
break;
}
}
}
}
[1] C++开源协程库libco详解