当前位置: 首页 > 工具软件 > ngx-formly > 使用案例 >

ngx_pool

澹台镜
2023-12-01

这里简单大致介绍线程池主要的功能 (下面描述如有错误,欢迎指正!!!)

功能分为(1)线程池的初始化 、(2)往线程池里投递任务(其实就是在线程池的链表中添加一个任务节点)、(3)循环执行任务、(4)线程池的销毁

这里仅仅介绍怎么初始化线程池中的线程池链表,关于其他的功能看下面代码注释

首先看这个结构

typedef struct {

thread_task_t *first;

thread_task_t **last;

} thread_pool_queue_t;

struct thread_task_s {

thread_task_t *next;

uint_t id;

void *ctx;

void (*handler)(void *data);

};

thread_tast_t 就是一个线程任务的一个结构体类型

初始化的时候 first->NULL; last->&first; 这是一个任务都没添加的时候 first也就是指向的第一个线程任务节点

这里巧妙地地方在于 last 存放着 first的地址 这就可以二级指针目的就是用来存放 下一节点地址

struct thread_tast_t{

//别的成员不描述 ;

thread_tast_t* next ; // last用来存放next指针地址

}

*last 代表当前节点next 这样一来,如果新添加一个节点 直接就可以把新加入的节点放在 *last之中 ,

再把last置为 这个节点next的地址 等待下一个节点的到来。

来一个节点 *last=new ; last=&(last->new);

#include "thread_pool.h"


static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);

        /*头文件关于线程池的一些结构体
typedef thread_tast_s thread_tast_t;
struct thread_task_s {
    thread_task_t       *next;
    uint_t               id;
    void                *ctx;
    void               (*handler)(void *data);
};

typedef struct {
    thread_task_t        *first;
    thread_task_t        **last;
} thread_pool_queue_t;

#define thread_pool_queue_init(q)                                         \
    (q)->first = NULL;                                                    \
    (q)->last = &(q)->first


struct thread_pool_s {
    pthread_mutex_t        mtx;
    thread_pool_queue_t   queue;
    int_t                 waiting;
    pthread_cond_t         cond;

    char                  *name;
    uint_t                threads;
    int_t                 max_queue;
};
       */

static uint_t       thread_pool_task_id;

static int debug = 0;

thread_pool_t* thread_pool_init()                                 //
{                                                                 //
    int             err;                                          //
    pthread_t       tid;                                          //
    uint_t          n;
    pthread_attr_t  attr;                                         //线程属性
	thread_pool_t   *tp=NULL;                                     //线程结构体

	tp = calloc(1,sizeof(thread_pool_t));

	if(tp == NULL){
	    fprintf(stderr, "thread_pool_init: calloc failed!\n");
	}

	thread_pool_init_default(tp, NULL);

    thread_pool_queue_init(&tp->queue);

    if (thread_mutex_create(&tp->mtx) != OK) {
		free(tp);
        return NULL;
    }

    if (thread_cond_create(&tp->cond) != OK) {
        (void) thread_mutex_destroy(&tp->mtx);
		free(tp);
        return NULL;
    }

    err = pthread_attr_init(&attr);
    if (err) {
        fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
		free(tp);
        return NULL;
    }

    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err) {
        fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
		free(tp);
        return NULL;
    }


    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
        if (err) {
            fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
			free(tp);
            return NULL;
        }
    }

    (void) pthread_attr_destroy(&attr);

    return tp;
}


void thread_pool_destroy(thread_pool_t *tp)
{
    uint_t           n;
    thread_task_t    task;
    volatile uint_t  lock;

    memset(&task,'\0', sizeof(thread_task_t));//创建临时任务变量 ,任务函数是消灭掉自己的线程 &lock是函数参数 

    task.handler = thread_pool_exit_handler;  
    task.ctx = (void *) &lock;

    for (n = 0; n < tp->threads; n++) {      //目前有多少个线程,调用多少次消灭线程函数
        lock = 1;

        if (thread_task_post(tp, &task) != OK) {
            return;
        }

        while (lock) {
            sched_yield();  
        }

        //task.event.active = 0;
    }

    (void) thread_cond_destroy(&tp->cond);
    (void) thread_mutex_destroy(&tp->mtx);

	free(tp);
}


static void                             //结束线程函数
thread_pool_exit_handler(void *data)
{
    uint_t *lock = data;

    *lock = 0;

    pthread_exit(0);
}


thread_task_t *
thread_task_alloc(size_t size)      
{
    thread_task_t  *task;

    task = calloc(1,sizeof(thread_task_t) + size);
    if (task == NULL) {
        return NULL;
    }

    task->ctx = task + 1;

    return task;
}

                              /*往线程池里投递一个任务函数*/
int_t                                                     //传递任务的地址 这个函数会给这块任务地址 绑定一个id编号
thread_task_post(thread_pool_t *tp, thread_task_t *task)  //并将这块地址挂接到线程池里面的队列链表里面
{                                                         //发送消息给各个线程唤醒等待 循环任务函数不会立即执行
    if (thread_mutex_lock(&tp->mtx) != OK) {              //当抛出任务函数解锁后 其他线程上锁才开始执行                   
        return ERROR;
    }

    if (tp->waiting >= tp->max_queue) {
        (void) thread_mutex_unlock(&tp->mtx);

        fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
                      tp->name, tp->waiting);
        return ERROR;
    }

    //task->event.active = 1;

    task->id = thread_pool_task_id++;
    task->next = NULL;

    if (thread_cond_signal(&tp->cond) != OK) {
        (void) thread_mutex_unlock(&tp->mtx);
        return ERROR;
    }

    *tp->queue.last = task;                  
    tp->queue.last = &task->next;

    tp->waiting++;

    (void) thread_mutex_unlock(&tp->mtx); //解锁

    if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
                   task->id, tp->name);

    return OK;
}
    
                      /*循环执行任务函数 */
static void *
thread_pool_cycle(void *data)
{
    thread_pool_t *tp = data;                        //把线程池的地址传递进来

    int                 err;
    thread_task_t       *task;

    if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);

    for ( ;; ) {                                    //                   
        if (thread_mutex_lock(&tp->mtx) != OK) {    //                  
            return NULL;                            //                                    
        }                                                           
                                                    
        
        tp->waiting--;

        while (tp->queue.first == NULL) {                   
            if (thread_cond_wait(&tp->cond, &tp->mtx)!= OK) 
            {
                (void) thread_mutex_unlock(&tp->mtx);// 
                return NULL;                         //这块代码一般不执行,执行则发生错误!
            }
        }
                                                     //任务队列任务数量不为零 或者起初没任务 但是接收到信号了
        task = tp->queue.first;                      //把该任务从队列头部拿出来指针指向下一个节点
        tp->queue.first = task->next;                

        if (tp->queue.first == NULL) {               //队列头部为空 last指向first地址
            tp->queue.last = &tp->queue.first;
        }
		
        if (thread_mutex_unlock(&tp->mtx) != OK) {   //解锁 让其他线程开始工作   
            return NULL;                             //拿出的这个节点 其他线程不会干扰到这个线程工作了
        }

        if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
                       task->id, tp->name);
					   
        task->handler(task->ctx);                    //把参数传递给工作函数

        if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);

        task->next = NULL;                           //这个地方  这个线程就处理task这块内存的任务了
                                                     //这块内存不在队列链表里面了 而且这块内存是动态申请的 需要外部释放掉free()
        //notify 
    }
}




static int_t
thread_pool_init_default(thread_pool_t *tpp, char *name)   //默认会添加线程池的名字 线程数目 队列最大数目
{
	if(tpp)
    {
        tpp->threads = DEFAULT_THREADS_NUM;
        tpp->max_queue = DEFAULT_QUEUE_NUM;
 		tpp->name = strdup(name?name:"default");
		
        if(debug)fprintf(stderr,
                      "thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",
                      tpp->name, tpp->threads, tpp->max_queue);

        return OK;
    }

    return ERROR;
}




 类似资料:

相关阅读

相关文章

相关问答