spserver实现了领导者/追随者模型,依赖线程池实现其机制,优点是不用缓存数据包,领导者线程获得数据后直接调用处理函数处理,并且其他追随者线程醒来成为领导者线程去等待下一个数据包
代码:
1.构造函数
SP_ThreadPool :: SP_ThreadPool( int maxThreads, const char * tag )
{
if( maxThreads <= 0 ) maxThreads = 2;
sp_thread_mutex_init( &mMainMutex, NULL );//线程池的互斥锁,对dispatch和saveThread函数的代码加锁,使得在操作SP_ThreadPool的成员函数时是线程安全的
sp_thread_cond_init( &mIdleCond, NULL );//表明线程池中是否有空闲的线程可以使用
sp_thread_cond_init( &mFullCond, NULL );//表面线程池中所有线程都处于空闲状态
sp_thread_cond_init( &mEmptyCond, NULL );
mMaxThreads = maxThreads;
mIndex = 0;
mIsShutdown = 0;
mTotal = 0;
tag = NULL == tag ? "unknown" : tag;
mTag = strdup( tag );
mThreadList = ( SP_Thread_t ** )malloc( sizeof( void * ) * mMaxThreads );
memset( mThreadList, 0, sizeof( void * ) * mMaxThreads );
}
再看线程处理完任务转空闲状态函数:
int SP_ThreadPool :: saveThread( SP_Thread_t * thread )
{
int ret = -1;
sp_thread_mutex_lock( &mMainMutex );
if( mIndex < mMaxThreads ) {//是否有空间保存空闲状态线程的数据结构
mThreadList[ mIndex ] = thread;//在mIndex索引的位置保存空闲线程的数据结构
mIndex++;
ret = 0;
sp_thread_cond_signal( &mIdleCond );//线程池已经有线程空闲出来,发信号
if( mIndex >= mTotal ) {//是否所有线程都已经空闲
sp_thread_cond_signal( &mFullCond );//线程池中所有线程都空闲了,发信号
}
}
sp_thread_mutex_unlock( &mMainMutex );
return ret;
}
一旦发出mIdleCond或mFullCond信号,在mIdleCond或mFullCond事件上等待的线程就会去试着获取mMainMutex,我们看关键函数dispatch
int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
int ret = 0;
sp_thread_attr_t attr;
SP_Thread_t * thread = NULL;
sp_thread_mutex_lock( &mMainMutex );
for( ; mIndex <= 0 && mTotal >= mMaxThreads; ) {
sp_thread_cond_wait( &mIdleCond, &mMainMutex );//如果mTotal>=mMaxThreads表示线程池中mTotal线程都在忙,没有空闲的线程可以执行dispatchFunc任务,那么就释放mMainMutex并在mIdleCond事件上等待,如果mIndex <= 0表示线程中还没有空闲线程保存到mThreadList成员,即无可以使用的空闲线程
}
一旦等待mIdleCond成功,sp_thread_cond_wait函数返回,此时mTotal>=mMaxThreads,但mIndex一定大于0,表明已经有空闲的线程可以使用,for循环会退出
再看任务执行函数wrapperFunc:
sp_thread_result_t SP_THREAD_CALL SP_ThreadPool :: wrapperFunc( void * arg )
{
SP_Thread_t * thread = ( SP_Thread_t * )arg;
for( ; 0 == thread->mParent->mIsShutdown; ) {
thread->mFunc( thread->mArg );//执行任务函数
if( 0 != thread->mParent->mIsShutdown ) break;//如果系统关闭则退出
sp_thread_mutex_lock( &thread->mMutex );
if( 0 == thread->mParent->saveThread( thread ) ) {//任务执行完就回到线程池中
sp_thread_cond_wait( &thread->mCond, &thread->mMutex );//释放掉mMutex等待mCond以备执行下一次任务,可能下一次任务thread->mFunc会被修改,但这不是线程池关心的
sp_thread_mutex_unlock( &thread->mMutex );
} else {//回到线程池中失败那么就销毁此线程,销毁后在dispatch中会检查线程数量并创建新的线程
sp_thread_mutex_unlock( &thread->mMutex );
sp_thread_cond_destroy( &thread->mCond );
sp_thread_mutex_destroy( &thread->mMutex );
free( thread );//释放掉此线程结构
thread = NULL;
break;
}
}
再回到dispatch函数
int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
int ret = 0;
sp_thread_attr_t attr;
SP_Thread_t * thread = NULL;
sp_thread_mutex_lock( &mMainMutex );
for( ; mIndex <= 0 && mTotal >= mMaxThreads; ) {
sp_thread_cond_wait( &mIdleCond, &mMainMutex );
}
if( mIndex <= 0 ) {
SP_Thread_t * thread = ( SP_Thread_t * )malloc( sizeof( SP_Thread_t ) );
memset( &thread->mId, 0, sizeof( thread->mId ) );
sp_thread_mutex_init( &thread->mMutex, NULL );
sp_thread_cond_init( &thread->mCond, NULL );
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
sp_thread_attr_init( &attr );
sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );
if( 0 == sp_thread_create( &( thread->mId ), &attr, wrapperFunc, thread ) ) {
mTotal++;
sp_syslog( LOG_NOTICE, "[tp@%s] create thread#%ld\n", mTag, thread->mId );
} else {
ret = -1;
sp_syslog( LOG_WARNING, "[tp@%s] cannot create thread\n", mTag );
sp_thread_mutex_destroy( &thread->mMutex );
sp_thread_cond_destroy( &thread->mCond );
free( thread );
}
sp_thread_attr_destroy( &attr );
} else {
mIndex--;
thread = mThreadList[ mIndex ];//从线程池中获取线程
mThreadList[ mIndex ] = NULL;
thread->mFunc = dispatchFunc;//更新任务函数
thread->mArg = arg;//更新任务参数
thread->mParent = this;
sp_thread_mutex_lock( &thread->mMutex );
sp_thread_cond_signal( &thread->mCond ) ;//唤醒线程
sp_thread_mutex_unlock ( &thread->mMutex );
}
sp_thread_mutex_unlock( &mMainMutex );
return ret;
}
mIndex现在大于0,执行else分支,从线程池中获取线程并调用sp_thread_cond_signal( &thread->mCond ) ;线程唤醒并执行dispatchFunc任务