我已经实现了一个线程池。现在,它的基本操作如下:
空初始化(布尔detached_threads);
bool调度(ulux(*dispatch_fn)(ulux), ulux arg, boolfree_arg);
void shut\u down();
静态无效*execute_task(无效*arg);
现在我想添加等待()操作,它将由主线程调用,并等待线程池中的所有线程完成它们正在执行的任务。我不想使用pthread_join,因为这会杀死所有线程,我不想再次创建线程池。我已经实现了下面提供的代码中的等待操作,但似乎不正确。
请给我建议什么是错的。谢谢!!!
#include "../inc/ThreadPool.hpp"
#include <cstdio>
#include <cstdlib>
#include <iostream>
using namespace std;
ThreadPool::ThreadPool( unsigned int n )
:num_threads(n)
{
if(num_threads<=0)
{
num_threads = DEFAULT_THREAD_POOL_SIZE;
}
barrier_count = 0;
threads = (pthread_t*) malloc(sizeof(pthread_t)*num_threads);
shutdown = false;
dont_accept = false;
pthread_mutex_init(&barrier_lock,NULL);
pthread_cond_init(&barrier_reached,NULL);
pthread_mutex_init(&q_lock,NULL);
pthread_cond_init(&q_not_empty,NULL);
pthread_cond_init(&q_empty,NULL);
}
ThreadPool::~ThreadPool()
{
//cout << "~ThreadPool()" << endl;
}
void ThreadPool::initialise( bool detached_threads )
{
//pthread_attr_t attr;
//if(detached_threads)
//{
//pthread_attr_init(&attr);
//pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//}
for ( int i = 0 ; i<num_threads ; i++ )
{
pthread_create( &threads[i] , NULL , execute_task , this );
}
}
bool ThreadPool::dispatch( void *(*routine)(void*) , void * arg , bool free_arg )
{
task_t * new_task = (task_t*) malloc(sizeof(task_t));
new_task->routine = routine;
new_task->arg = arg;
new_task->free_arg = free_arg;
pthread_mutex_lock(&q_lock);
if(dont_accept)
{
free(new_task);
return false;
}
bool was_empty = tasks.empty();
tasks.push(new_task);
if(was_empty)
{
pthread_cond_signal(&q_not_empty);
}
pthread_mutex_unlock(&q_lock);
return true;
}
void ThreadPool::shut_down()
{
void * return_val;
pthread_mutex_lock(&q_lock);
dont_accept = true;
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
shutdown = true;
pthread_cond_broadcast(&q_not_empty);
pthread_mutex_unlock(&q_lock);
for(int i=0 ; i<num_threads ; i++)
{
//pthread_join(threads[i],NULL);
pthread_join(threads[i],&return_val);
}
free(threads);
pthread_mutex_destroy(&barrier_lock);
pthread_cond_destroy(&barrier_reached);
pthread_mutex_destroy(&q_lock);
pthread_cond_destroy(&q_empty);
pthread_cond_destroy(&q_not_empty);
}
void ThreadPool::init_barrier()
{
pthread_mutex_lock(&barrier_lock);
barrier_count = 0;
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::barrier( int ns )
{
pthread_mutex_lock(&barrier_lock);
barrier_count++;
if(barrier_count==ns)
{
for( int i=0 ; i<ns ; i++ )
{
pthread_cond_signal(&barrier_reached);
}
}else
{
while( barrier_count<ns )
{
pthread_cond_wait(&barrier_reached,&barrier_lock);
}
}
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::wait()
{
pthread_mutex_lock(&q_lock);
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
pthread_mutex_unlock(&q_lock);
}
void * ThreadPool::execute_task( void * arg )
{
ThreadPool * thread_pool = (ThreadPool*) arg;
task_t * cur_task;
while(true)
{
pthread_mutex_lock(&(thread_pool->q_lock));
while((thread_pool->tasks).empty())
{
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
//cout << "I'm going to sleep!!!" << endl;
pthread_cond_wait(&(thread_pool->q_not_empty),&(thread_pool->q_lock));
//cout << "I've woken up!!!" << endl;
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
}
cur_task = thread_pool->tasks.front();
thread_pool->tasks.pop();
if(thread_pool->tasks.empty() && !thread_pool->shutdown )
{
pthread_cond_signal(&(thread_pool->q_empty));
}
pthread_mutex_unlock(&(thread_pool->q_lock));
//cout << "I'm executing a task!!!" << endl;
(cur_task->routine)(cur_task->arg);
if(cur_task->free_arg)
{
free(cur_task->arg);
}
free(cur_task);
//cout << "I'm done with the task!!!" << endl;
}
}
我通常会从线程池中请求一个“TasksetWait”(TW)对象,通过一个“dispatch”方法发出任务,然后,对于同步通知,调用一个“AwaitCompletion()方法”。TW提供了一个已为请求线程锁定的专用互斥锁(确保它现在具有独占访问)、一个任务计数器int、一个供请求者等待的“已完成”条件变量/事件以及对其池的引用。TW调度将任务转发到池中,方法是向每个任务中加载一个对自身的引用,将任务推到其池中,并通过递增任务计数器计算出它们。
然后,请求线程调用TW-
与此同时,池线程正在执行任务run()方法。run()返回后,任务调用TW的“OnComplection()”方法,该方法锁定互斥锁并减少计数。如果计数仍然非零,它会解锁互斥锁并退出。如果计数为零,它会解锁互斥锁,发出事件信号并退出。
当请求程序再次运行时,它可以将TW返回到池中(池中可能会保留TW的缓存),也可以将其销毁。
一种变体是请求者向TW提供一个“OnComplection”方法,以便完成最后一个任务的池线程可以调用它,从而提供异步通知(可能需要将消息发布到GUI输入队列)。
这样的机制允许线程池被多个请求者线程使用,或者(通过异步通知)请求者发出多个任务块,但如果请求者线程本身是池中运行任务的池线程,则可能会有点混淆(如果您想对流程的实际操作有任何了解,最好避免这样做:)。
该程序创建一个线程,用dir()读取目录,并将文件放在通道上$N个工作线程读取该通道并“处理”(打印)文件。 然而,我得到了这个“等待的第一个操作:”错误。 关于这个错误,我已经在陷阱页面上读了好几遍了,但还是没有弄明白。能解释一下这里发生了什么吗? 目录内容: 运行程序: 程序travers-dir. p6:
问题内容: 我正在为我的ubuntu服务器(针对我的多客户端匿名聊天程序)实现一种简单的线程池机制,并且需要使我的工作线程进入睡眠状态,直到需要执行一项工作(以函数指针和参数的形式) 。 我当前的系统即将关闭。我(工人线程正在)问经理是否有工作可用,以及是否有5毫秒没有睡眠。如果存在,请将作业添加到工作队列中并运行该函数。糟糕的循环浪费。 什么我 喜欢 做的是做一个简单的事件性的系统。我正在考虑有
问题内容: 如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案 确实 并行运行这两个操作,但如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。 如果您只想启动它们,并行运行它们,并获得
问题内容: 我如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案确实并行运行这两个操作,但是如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。如果您只想启动它们,并行运行它们,并获得两
本文向大家介绍C#线程池操作方法,包括了C#线程池操作方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#线程池操作方法。分享给大家供大家参考。具体如下: 希望本文所述对大家的C#程序设计有所帮助。
问题内容: 我有以下情况: 为了运行算法,我必须运行多个线程,并且每个线程都会在死之前设置一个实例变量x。问题是这些线程不会立即返回: 我应该使用等待通知吗?还是我应该嵌入一个while循环并检查是否终止? 感谢大家! 问题答案: 创建一些共享存储来保存每个线程的值,或者如果足够的话,只存储总和。使用a 等待线程终止。每个线程完成后都会调用,您的方法将使用该方法来等待它们。 编辑: 这是我建议的方