当前位置: 首页 > 工具软件 > SPServer > 使用案例 >

spserver线程池代码阅读

严信瑞
2023-12-01

spserver实现了领导者/追随者模型,依赖线程池实现其机制,优点是不用缓存数据包,领导者线程获得数据后直接调用处理函数处理,并且其他追随者线程醒来成为领导者线程去等待下一个数据包

代码:

1.构造函数
SP_ThreadPool :: SP_ThreadPool( int maxThreads, const char * tag )
{
    if( maxThreads <= 0 ) maxThreads = 2;

    sp_thread_mutex_init( &mMainMutex, NULL );//线程池的互斥锁,对dispatch和saveThread函数的代码加锁,使得在操作SP_ThreadPool的成员函数时是线程安全的
    sp_thread_cond_init( &mIdleCond, NULL );//表明线程池中是否有空闲的线程可以使用
    sp_thread_cond_init( &mFullCond, NULL );//表面线程池中所有线程都处于空闲状态
    sp_thread_cond_init( &mEmptyCond, NULL );
    mMaxThreads = maxThreads;
    mIndex = 0;
    mIsShutdown = 0;
    mTotal = 0;

    tag = NULL == tag ? "unknown" : tag;
    mTag = strdup( tag );

    mThreadList = ( SP_Thread_t ** )malloc( sizeof( void * ) * mMaxThreads );
    memset( mThreadList, 0, sizeof( void * ) * mMaxThreads );
}

再看线程处理完任务转空闲状态函数:
int SP_ThreadPool :: saveThread( SP_Thread_t * thread )
{
    int ret = -1;

    sp_thread_mutex_lock( &mMainMutex );

    if( mIndex < mMaxThreads ) {//是否有空间保存空闲状态线程的数据结构
        mThreadList[ mIndex ] = thread;//在mIndex索引的位置保存空闲线程的数据结构
        mIndex++;
        ret = 0;

        sp_thread_cond_signal( &mIdleCond );//线程池已经有线程空闲出来,发信号

        if( mIndex >= mTotal ) {//是否所有线程都已经空闲
            sp_thread_cond_signal( &mFullCond );//线程池中所有线程都空闲了,发信号
        }
    }

    sp_thread_mutex_unlock( &mMainMutex );

    return ret;
}

一旦发出mIdleCond或mFullCond信号,在mIdleCond或mFullCond事件上等待的线程就会去试着获取mMainMutex,我们看关键函数dispatch


int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
    int ret = 0;

    sp_thread_attr_t attr;
    SP_Thread_t * thread = NULL;

    sp_thread_mutex_lock( &mMainMutex );

    for( ; mIndex <= 0 && mTotal >= mMaxThreads; ) {
        sp_thread_cond_wait( &mIdleCond, &mMainMutex );//如果mTotal>=mMaxThreads表示线程池中mTotal线程都在忙,没有空闲的线程可以执行dispatchFunc任务,那么就释放mMainMutex并在mIdleCond事件上等待,如果mIndex <= 0表示线程中还没有空闲线程保存到mThreadList成员,即无可以使用的空闲线程
    }

一旦等待mIdleCond成功,sp_thread_cond_wait函数返回,此时mTotal>=mMaxThreads,但mIndex一定大于0,表明已经有空闲的线程可以使用,for循环会退出

再看任务执行函数wrapperFunc:

sp_thread_result_t SP_THREAD_CALL SP_ThreadPool :: wrapperFunc( void * arg )
{
    SP_Thread_t * thread = ( SP_Thread_t * )arg;

    for( ; 0 == thread->mParent->mIsShutdown; ) {
        thread->mFunc( thread->mArg );//执行任务函数

        if( 0 != thread->mParent->mIsShutdown ) break;//如果系统关闭则退出

        sp_thread_mutex_lock( &thread->mMutex );
        if( 0 == thread->mParent->saveThread( thread ) ) {//任务执行完就回到线程池中
            sp_thread_cond_wait( &thread->mCond, &thread->mMutex );//释放掉mMutex等待mCond以备执行下一次任务,可能下一次任务thread->mFunc会被修改,但这不是线程池关心的
            sp_thread_mutex_unlock( &thread->mMutex );
        } else {//回到线程池中失败那么就销毁此线程,销毁后在dispatch中会检查线程数量并创建新的线程
            sp_thread_mutex_unlock( &thread->mMutex );
            sp_thread_cond_destroy( &thread->mCond );
            sp_thread_mutex_destroy( &thread->mMutex );

            free( thread );//释放掉此线程结构
            thread = NULL;
            break;
        }
    }

再回到dispatch函数

int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
    int ret = 0;

    sp_thread_attr_t attr;
    SP_Thread_t * thread = NULL;

    sp_thread_mutex_lock( &mMainMutex );

    for( ; mIndex <= 0 && mTotal >= mMaxThreads; ) {
        sp_thread_cond_wait( &mIdleCond, &mMainMutex );
    }

    if( mIndex <= 0 ) {
        SP_Thread_t * thread = ( SP_Thread_t * )malloc( sizeof( SP_Thread_t ) );
        memset( &thread->mId, 0, sizeof( thread->mId ) );
        sp_thread_mutex_init( &thread->mMutex, NULL );
        sp_thread_cond_init( &thread->mCond, NULL );
        thread->mFunc = dispatchFunc;
        thread->mArg = arg;
        thread->mParent = this;

        sp_thread_attr_init( &attr );
        sp_thread_attr_setdetachstate( &attr, SP_THREAD_CREATE_DETACHED );

        if( 0 == sp_thread_create( &( thread->mId ), &attr, wrapperFunc, thread ) ) {
            mTotal++;
            sp_syslog( LOG_NOTICE, "[tp@%s] create thread#%ld\n", mTag, thread->mId );
        } else {
            ret = -1;
            sp_syslog( LOG_WARNING, "[tp@%s] cannot create thread\n", mTag );
            sp_thread_mutex_destroy( &thread->mMutex );
            sp_thread_cond_destroy( &thread->mCond );
            free( thread );
        }
        sp_thread_attr_destroy( &attr );
    } else {
        mIndex--;
        thread = mThreadList[ mIndex ];//从线程池中获取线程
        mThreadList[ mIndex ] = NULL;

        thread->mFunc = dispatchFunc;//更新任务函数
        thread->mArg = arg;//更新任务参数
        thread->mParent = this;

        sp_thread_mutex_lock( &thread->mMutex );
        sp_thread_cond_signal( &thread->mCond ) ;//唤醒线程
        sp_thread_mutex_unlock ( &thread->mMutex );
    }

    sp_thread_mutex_unlock( &mMainMutex );

    return ret;
}
mIndex现在大于0,执行else分支,从线程池中获取线程并调用sp_thread_cond_signal( &thread->mCond ) ;线程唤醒并执行dispatchFunc任务

 类似资料: