我有一个程序,它的函数将指针作为arg和main。main正在创建n个线程,每个线程根据传递的arg
在不同的内存区域上运行该函数。然后加入线程,main在区域之间执行一些数据混合并创建n个新线程,这些线程执行与旧线程相同的操作。
为了改进程序,我想让线程保持活动状态,消除创建它们所需的长时间。线程应该在主线程工作时Hibernate,并在它们必须再次出现时通知。同样,当线程工作时,主线程应该等待,就像连接时一样。
我最终不能强力实施这一点,总是陷入僵局。
简单的基线代码,任何关于如何修改它的提示都将不胜感激
#include <thread>
#include <climits>
...
void myfunc(void * p) {
do_something(p);
}
int main(){
void * myp[n_threads] {a_location, another_location,...};
std::thread mythread[n_threads];
for (unsigned long int j=0; j < ULONG_MAX; j++) {
for (unsigned int i=0; i < n_threads; i++) {
mythread[i] = std::thread(myfunc, myp[i]);
}
for (unsigned int i=0; i < n_threads; i++) {
mythread[i].join();
}
mix_data(myp);
}
return 0;
}
它可以很容易地通过一个屏障(只是一个条件变量和计数器上的方便包装器)实现。它基本上会阻塞,直到所有N个线程都达到“屏障”。然后它再次“回收”。Boost提供了一个实现。
void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) {
while (!stop_condition) // You'll need to tell them to stop somehow
{
start_barrier.wait ();
do_something(p);
end_barrier.wait ();
}
}
int main(){
void * myp[n_threads] {a_location, another_location,...};
boost::barrier start_barrier (n_threads + 1); // child threads + main thread
boost::barrier end_barrier (n_threads + 1); // child threads + main thread
std::thread mythread[n_threads];
for (unsigned int i=0; i < n_threads; i++) {
mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier);
}
start_barrier.wait (); // first unblock the threads
for (unsigned long int j=0; j < ULONG_MAX; j++) {
end_barrier.wait (); // mix_data must not execute before the threads are done
mix_data(myp);
start_barrier.wait (); // threads must not start new iteration before mix_data is done
}
return 0;
}
您需要的概念是线程池。这个SO问题涉及现有的实现。
其想法是为多个线程实例提供一个容器。每个实例都与一个函数相关联,该函数轮询任务队列,当任务可用时,将其拉入并运行。一旦任务结束(如果它终止,但这是另一个问题),线程就会直接循环到任务队列。
因此,您需要一个同步队列、一个在队列上实现循环的线程类、一个任务对象的接口,也许还有一个驱动整个事物的类(池类)。
或者,您可以为它必须执行的任务创建一个非常专用的线程类(例如,仅使用内存区域作为参数)。这需要线程的通知机制来指示它们已在当前迭代中完成。
线程主函数将是该特定任务的一个循环,在一次迭代结束时,线程发出结束信号,并等待条件变量启动下一个循环。本质上,您将把任务代码内联到线程中,完全不需要队列。
using namespace std;
// semaphore class based on C++11 features
class semaphore {
private:
mutex mMutex;
condition_variable v;
int mV;
public:
semaphore(int v): mV(v){}
void signal(int count=1){
unique_lock lock(mMutex);
mV+=count;
if (mV > 0) mCond.notify_all();
}
void wait(int count = 1){
unique_lock lock(mMutex);
mV-= count;
while (mV < 0)
mCond.wait(lock);
}
};
template <typename Task>
class TaskThread {
thread mThread;
Task *mTask;
semaphore *mSemStarting, *mSemFinished;
volatile bool mRunning;
public:
TaskThread(Task *task, semaphore *start, semaphore *finish):
mTask(task), mRunning(true),
mSemStart(start), mSemFinished(finish),
mThread(&TaskThread<Task>::psrun){}
~TaskThread(){ mThread.join(); }
void run(){
do {
(*mTask)();
mSemFinished->signal();
mSemStart->wait();
} while (mRunning);
}
void finish() { // end the thread after the current loop
mRunning = false;
}
private:
static void psrun(TaskThread<Task> *self){ self->run();}
};
classcMyTask {
public:
MyTask(){}
void operator()(){
// some code here
}
};
int main(){
MyTask task1;
MyTask task2;
semaphore start(2), finished(0);
TaskThread<MyTask> t1(&task1, &start, &finished);
TaskThread<MyTask> t2(&task2, &start, &finished);
for (int i = 0; i < 10; i++){
finished.wait(2);
start.signal(2);
}
t1.finish();
t2.finish();
}
上面提出的(粗略)实现依赖于任务
类型,它必须提供运算符()
(即仿函数类)。我之前说过您可以将任务代码直接合并到线程函数体中,但由于我不知道它,我尽可能将其抽象化。线程的开始有一个条件变量,结束有一个条件变量,都封装在信号量实例中。
看到另一个答案建议使用boost::barrier
,我只能支持这个想法:如果可能的话,确保用那个类替换我的信号量类,原因是,对于相同的功能集,最好依赖经过良好测试和维护的外部代码,而不是自行实现的解决方案。
总而言之,这两种方法都是有效的,但前者放弃了一点点性能,转而支持灵活性。如果要执行的任务需要足够长的时间,那么管理和队列同步成本将变得微不足道。
更新:代码已修复并测试。用信号量替换了一个简单的条件变量。
这里有一种可能的方法,只使用C 11标准库中的类。基本上,您创建的每个线程都有一个相关的命令队列(封装在std::packaged_task中)
而通过使用
std::mutex
和std::unique_lock可以避免数据争用
下面是一个遵循这种设计的简单程序。评论应足以解释其作用:
#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>
// Convenience type definition
using job = std::packaged_task<void()>;
// Some data associated to each thread.
struct thread_data
{
int id; // Could use thread::id, but this is filled before the thread is started
std::thread t; // The thread object
std::queue<job> jobs; // The job queue
std::condition_variable cv; // The condition variable to wait for threads
std::mutex m; // Mutex used for avoiding data races
bool stop = false; // When set, this flag tells the thread that it should exit
};
// The thread function executed by each thread
void thread_func(thread_data* pData)
{
std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
while (true)
{
l.lock();
// Wait until the queue won't be empty or stop is signaled
pData->cv.wait(l, [pData] () {
return (pData->stop || !pData->jobs.empty());
});
// Stop was signaled, let's exit the thread
if (pData->stop) { return; }
// Pop one task from the queue...
job j = std::move(pData->jobs.front());
pData->jobs.pop();
l.unlock();
// Execute the task!
j();
}
}
// Function that creates a simple task
job create_task(int id, int jobNumber)
{
job j([id, jobNumber] ()
{
std::stringstream s;
s << "Hello " << id << "." << jobNumber << std::endl;
std::cout << s.str();
});
return j;
}
int main()
{
const int numThreads = 4;
const int numJobsPerThread = 10;
std::vector<std::future<void>> futures;
// Create all the threads (will be waiting for jobs)
thread_data threads[numThreads];
int tdi = 0;
for (auto& td : threads)
{
td.id = tdi++;
td.t = std::thread(thread_func, &td);
}
//=================================================
// Start assigning jobs to each thread...
for (auto& td : threads)
{
for (int i = 0; i < numJobsPerThread; i++)
{
job j = create_task(td.id, i);
futures.push_back(j.get_future());
std::unique_lock<std::mutex> l(td.m);
td.jobs.push(std::move(j));
}
// Notify the thread that there is work do to...
td.cv.notify_one();
}
// Wait for all the tasks to be completed...
for (auto& f : futures) { f.wait(); }
futures.clear();
//=================================================
// Here the main thread does something...
std::cin.get();
// ...done!
//=================================================
//=================================================
// Posts some new tasks...
for (auto& td : threads)
{
for (int i = 0; i < numJobsPerThread; i++)
{
job j = create_task(td.id, i);
futures.push_back(j.get_future());
std::unique_lock<std::mutex> l(td.m);
td.jobs.push(std::move(j));
}
// Notify the thread that there is work do to...
td.cv.notify_one();
}
// Wait for all the tasks to be completed...
for (auto& f : futures) { f.wait(); }
futures.clear();
// Send stop signal to all threads and join them...
for (auto& td : threads)
{
std::unique_lock<std::mutex> l(td.m);
td.stop = true;
td.cv.notify_one();
}
// Join all the threads
for (auto& td : threads) { td.t.join(); }
}
问题内容: 我在自学Java线程时,发现有些令我困惑的地方。我做了一个叫做实现的课程。run方法仅打印“ Hello World”,休眠一秒钟,然后重复。 在我的主要方法中,我有: 如我所料,我看到了“ Hello World”和“ Done”。快速打印,这意味着main方法已到达末尾,但是我没想到的是,即使到达main末尾后,我开始运行的线程仍保持运行。 为什么即使退出主程序后程序仍继续执行?我
问题内容: 我目前正在尝试诊断应用程序中的缓慢内存泄漏。到目前为止,我掌握的事实如下。 我有4天运行该应用程序的堆转储。 该堆转储包含约800个WeakReference对象,这些对象指向保留40mb内存的对象(所有对象都是同一类型,出于这个问题的目的,我将其称为Foo)。 Eclipse内存分析工具显示,这些WeakReferences引用的每个Foo对象均未被其他任何对象引用。我的期望是,这应
方法有一个有趣的属性,它将允许其他线程在被阻止时进入其同步块。例如(假设线程1首先运行): 线程1: 线程2: 线程 2 能够唤醒线程 1 的事实意味着线程 2 进入了同步块,即使其他某个线程位于同一对象的同步块中也是如此。这对我来说很好,但我想知道这是否只发生在或所有会使线程“等待”的方法()上。在我的情况下,我关心,因为如果行为与相同,它会破坏我的代码: 那么,多个线程是否可能因为join调用
我已经通过Firebase使用他们的API成功创建了一个动态链接,如下所示:https://firebase.google.com/docs/dynamic-links/rest#creating-a-short-dynamic-link.我想知道这些“动态生成”动态链接的寿命是多少?
下面的代码创建了一个新的custom um < code > Thread ,并等待线程结束,直到主线程再次激活。 > < li >我不太明白它是如何工作的。为什么< code > myth read . wait();立即接到电话? < li> 为什么不改用< code>Thread.join()? 公共静态void main(String[] args) {