这里简单大致介绍线程池主要的功能 (下面描述如有错误,欢迎指正!!!)
功能分为(1)线程池的初始化 、(2)往线程池里投递任务(其实就是在线程池的链表中添加一个任务节点)、(3)循环执行任务、(4)线程池的销毁
这里仅仅介绍怎么初始化线程池中的线程池链表,关于其他的功能看下面代码注释
首先看这个结构
typedef struct {
thread_task_t *first;
thread_task_t **last;
} thread_pool_queue_t;
struct thread_task_s {
thread_task_t *next;
uint_t id;
void *ctx;
void (*handler)(void *data);
};
thread_tast_t 就是一个线程任务的一个结构体类型
初始化的时候 first->NULL; last->&first; 这是一个任务都没添加的时候 first也就是指向的第一个线程任务节点
这里巧妙地地方在于 last 存放着 first的地址 这就可以二级指针目的就是用来存放 下一节点地址
struct thread_tast_t{
//别的成员不描述 ;
thread_tast_t* next ; // last用来存放next指针地址
}
*last 代表当前节点next 这样一来,如果新添加一个节点 直接就可以把新加入的节点放在 *last之中 ,
再把last置为 这个节点next的地址 等待下一个节点的到来。
来一个节点 *last=new ; last=&(last->new);
#include "thread_pool.h"
static void thread_pool_exit_handler(void *data);
static void *thread_pool_cycle(void *data);
static int_t thread_pool_init_default(thread_pool_t *tpp, char *name);
/*头文件关于线程池的一些结构体
typedef thread_tast_s thread_tast_t;
struct thread_task_s {
thread_task_t *next;
uint_t id;
void *ctx;
void (*handler)(void *data);
};
typedef struct {
thread_task_t *first;
thread_task_t **last;
} thread_pool_queue_t;
#define thread_pool_queue_init(q) \
(q)->first = NULL; \
(q)->last = &(q)->first
struct thread_pool_s {
pthread_mutex_t mtx;
thread_pool_queue_t queue;
int_t waiting;
pthread_cond_t cond;
char *name;
uint_t threads;
int_t max_queue;
};
*/
static uint_t thread_pool_task_id;
static int debug = 0;
thread_pool_t* thread_pool_init() //
{ //
int err; //
pthread_t tid; //
uint_t n;
pthread_attr_t attr; //线程属性
thread_pool_t *tp=NULL; //线程结构体
tp = calloc(1,sizeof(thread_pool_t));
if(tp == NULL){
fprintf(stderr, "thread_pool_init: calloc failed!\n");
}
thread_pool_init_default(tp, NULL);
thread_pool_queue_init(&tp->queue);
if (thread_mutex_create(&tp->mtx) != OK) {
free(tp);
return NULL;
}
if (thread_cond_create(&tp->cond) != OK) {
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
return NULL;
}
err = pthread_attr_init(&attr);
if (err) {
fprintf(stderr, "pthread_attr_init() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err) {
fprintf(stderr, "pthread_attr_setdetachstate() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
for (n = 0; n < tp->threads; n++) {
err = pthread_create(&tid, &attr, thread_pool_cycle, tp);
if (err) {
fprintf(stderr, "pthread_create() failed, reason: %s\n",strerror(errno));
free(tp);
return NULL;
}
}
(void) pthread_attr_destroy(&attr);
return tp;
}
void thread_pool_destroy(thread_pool_t *tp)
{
uint_t n;
thread_task_t task;
volatile uint_t lock;
memset(&task,'\0', sizeof(thread_task_t));//创建临时任务变量 ,任务函数是消灭掉自己的线程 &lock是函数参数
task.handler = thread_pool_exit_handler;
task.ctx = (void *) &lock;
for (n = 0; n < tp->threads; n++) { //目前有多少个线程,调用多少次消灭线程函数
lock = 1;
if (thread_task_post(tp, &task) != OK) {
return;
}
while (lock) {
sched_yield();
}
//task.event.active = 0;
}
(void) thread_cond_destroy(&tp->cond);
(void) thread_mutex_destroy(&tp->mtx);
free(tp);
}
static void //结束线程函数
thread_pool_exit_handler(void *data)
{
uint_t *lock = data;
*lock = 0;
pthread_exit(0);
}
thread_task_t *
thread_task_alloc(size_t size)
{
thread_task_t *task;
task = calloc(1,sizeof(thread_task_t) + size);
if (task == NULL) {
return NULL;
}
task->ctx = task + 1;
return task;
}
/*往线程池里投递一个任务函数*/
int_t //传递任务的地址 这个函数会给这块任务地址 绑定一个id编号
thread_task_post(thread_pool_t *tp, thread_task_t *task) //并将这块地址挂接到线程池里面的队列链表里面
{ //发送消息给各个线程唤醒等待 循环任务函数不会立即执行
if (thread_mutex_lock(&tp->mtx) != OK) { //当抛出任务函数解锁后 其他线程上锁才开始执行
return ERROR;
}
if (tp->waiting >= tp->max_queue) {
(void) thread_mutex_unlock(&tp->mtx);
fprintf(stderr,"thread pool \"%s\" queue overflow: %ld tasks waiting\n",
tp->name, tp->waiting);
return ERROR;
}
//task->event.active = 1;
task->id = thread_pool_task_id++;
task->next = NULL;
if (thread_cond_signal(&tp->cond) != OK) {
(void) thread_mutex_unlock(&tp->mtx);
return ERROR;
}
*tp->queue.last = task;
tp->queue.last = &task->next;
tp->waiting++;
(void) thread_mutex_unlock(&tp->mtx); //解锁
if(debug)fprintf(stderr,"task #%lu added to thread pool \"%s\"\n",
task->id, tp->name);
return OK;
}
/*循环执行任务函数 */
static void *
thread_pool_cycle(void *data)
{
thread_pool_t *tp = data; //把线程池的地址传递进来
int err;
thread_task_t *task;
if(debug)fprintf(stderr,"thread in pool \"%s\" started\n", tp->name);
for ( ;; ) { //
if (thread_mutex_lock(&tp->mtx) != OK) { //
return NULL; //
}
tp->waiting--;
while (tp->queue.first == NULL) {
if (thread_cond_wait(&tp->cond, &tp->mtx)!= OK)
{
(void) thread_mutex_unlock(&tp->mtx);//
return NULL; //这块代码一般不执行,执行则发生错误!
}
}
//任务队列任务数量不为零 或者起初没任务 但是接收到信号了
task = tp->queue.first; //把该任务从队列头部拿出来指针指向下一个节点
tp->queue.first = task->next;
if (tp->queue.first == NULL) { //队列头部为空 last指向first地址
tp->queue.last = &tp->queue.first;
}
if (thread_mutex_unlock(&tp->mtx) != OK) { //解锁 让其他线程开始工作
return NULL; //拿出的这个节点 其他线程不会干扰到这个线程工作了
}
if(debug) fprintf(stderr,"run task #%lu in thread pool \"%s\"\n",
task->id, tp->name);
task->handler(task->ctx); //把参数传递给工作函数
if(debug) fprintf(stderr,"complete task #%lu in thread pool \"%s\"\n",task->id, tp->name);
task->next = NULL; //这个地方 这个线程就处理task这块内存的任务了
//这块内存不在队列链表里面了 而且这块内存是动态申请的 需要外部释放掉free()
//notify
}
}
static int_t
thread_pool_init_default(thread_pool_t *tpp, char *name) //默认会添加线程池的名字 线程数目 队列最大数目
{
if(tpp)
{
tpp->threads = DEFAULT_THREADS_NUM;
tpp->max_queue = DEFAULT_QUEUE_NUM;
tpp->name = strdup(name?name:"default");
if(debug)fprintf(stderr,
"thread_pool_init, name: %s ,threads: %lu max_queue: %ld\n",
tpp->name, tpp->threads, tpp->max_queue);
return OK;
}
return ERROR;
}