当前位置: 首页 > 知识库问答 >
问题:

通过同步延长线程的寿命(C 11)

陆和泰
2023-03-14

我有一个程序,它的函数将指针作为arg和main。main正在创建n个线程,每个线程根据传递的arg在不同的内存区域上运行该函数。然后加入线程,main在区域之间执行一些数据混合并创建n个新线程,这些线程执行与旧线程相同的操作。

为了改进程序,我想让线程保持活动状态,消除创建它们所需的长时间。线程应该在主线程工作时Hibernate,并在它们必须再次出现时通知。同样,当线程工作时,主线程应该等待,就像连接时一样。

我最终不能强力实施这一点,总是陷入僵局。

简单的基线代码,任何关于如何修改它的提示都将不胜感激

#include <thread>
#include <climits>

...

void myfunc(void * p) {
  do_something(p);
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};
  std::thread mythread[n_threads];
  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i]);
    }
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i].join();
    }
    mix_data(myp); 
  }
  return 0;
}

共有3个答案

钮兴安
2023-03-14

它可以很容易地通过一个屏障(只是一个条件变量和计数器上的方便包装器)实现。它基本上会阻塞,直到所有N个线程都达到“屏障”。然后它再次“回收”。Boost提供了一个实现。

void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) {
  while (!stop_condition) // You'll need to tell them to stop somehow
  {
      start_barrier.wait ();
      do_something(p);
      end_barrier.wait ();
  }
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};

  boost::barrier start_barrier (n_threads + 1); // child threads + main thread
  boost::barrier end_barrier (n_threads + 1); // child threads + main thread

  std::thread mythread[n_threads];

    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier);
    }

  start_barrier.wait (); // first unblock the threads

  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    end_barrier.wait (); // mix_data must not execute before the threads are done
    mix_data(myp); 
    start_barrier.wait (); // threads must not start new iteration before mix_data is done
  }
  return 0;
}
丁兴德
2023-03-14

您需要的概念是线程池。这个SO问题涉及现有的实现。

其想法是为多个线程实例提供一个容器。每个实例都与一个函数相关联,该函数轮询任务队列,当任务可用时,将其拉入并运行。一旦任务结束(如果它终止,但这是另一个问题),线程就会直接循环到任务队列。

因此,您需要一个同步队列、一个在队列上实现循环的线程类、一个任务对象的接口,也许还有一个驱动整个事物的类(池类)。

或者,您可以为它必须执行的任务创建一个非常专用的线程类(例如,仅使用内存区域作为参数)。这需要线程的通知机制来指示它们已在当前迭代中完成。

线程主函数将是该特定任务的一个循环,在一次迭代结束时,线程发出结束信号,并等待条件变量启动下一个循环。本质上,您将把任务代码内联到线程中,完全不需要队列。

 using namespace std;

 // semaphore class based on C++11 features
 class semaphore {
     private:
         mutex mMutex;
         condition_variable v;
         int mV;
     public:
         semaphore(int v): mV(v){}
         void signal(int count=1){
             unique_lock lock(mMutex);
             mV+=count;
             if (mV > 0) mCond.notify_all();
         }
         void wait(int count = 1){
             unique_lock lock(mMutex);
             mV-= count;
             while (mV < 0)
                 mCond.wait(lock);
         }
 };

template <typename Task>
class TaskThread {
     thread mThread;
     Task *mTask;
     semaphore *mSemStarting, *mSemFinished;
     volatile bool mRunning;
    public:
    TaskThread(Task *task, semaphore *start, semaphore *finish): 
         mTask(task), mRunning(true), 
         mSemStart(start), mSemFinished(finish),
        mThread(&TaskThread<Task>::psrun){}
    ~TaskThread(){ mThread.join(); }

    void run(){
        do {
             (*mTask)();
             mSemFinished->signal();
             mSemStart->wait();
        } while (mRunning);
    }

   void finish() { // end the thread after the current loop
         mRunning = false;
   }
private:
    static void psrun(TaskThread<Task> *self){ self->run();}
 };

 classcMyTask {
     public:
     MyTask(){}
    void operator()(){
        // some code here
     }
 };

int main(){
    MyTask task1;
    MyTask task2;
    semaphore start(2), finished(0);
    TaskThread<MyTask> t1(&task1, &start, &finished);
    TaskThread<MyTask> t2(&task2, &start, &finished);
    for (int i = 0; i < 10; i++){
         finished.wait(2);
         start.signal(2);
    }
    t1.finish();
    t2.finish();
}

上面提出的(粗略)实现依赖于任务类型,它必须提供运算符()(即仿函数类)。我之前说过您可以将任务代码直接合并到线程函数体中,但由于我不知道它,我尽可能将其抽象化。线程的开始有一个条件变量,结束有一个条件变量,都封装在信号量实例中。

看到另一个答案建议使用boost::barrier,我只能支持这个想法:如果可能的话,确保用那个类替换我的信号量类,原因是,对于相同的功能集,最好依赖经过良好测试和维护的外部代码,而不是自行实现的解决方案。

总而言之,这两种方法都是有效的,但前者放弃了一点点性能,转而支持灵活性。如果要执行的任务需要足够长的时间,那么管理和队列同步成本将变得微不足道。

更新:代码已修复并测试。用信号量替换了一个简单的条件变量。

戚星腾
2023-03-14

这里有一种可能的方法,只使用C 11标准库中的类。基本上,您创建的每个线程都有一个相关的命令队列(封装在std::packaged_task中)

而通过使用std::mutexstd::unique_lock可以避免数据争用

下面是一个遵循这种设计的简单程序。评论应足以解释其作用:

#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>

// Convenience type definition
using job = std::packaged_task<void()>;

// Some data associated to each thread.
struct thread_data
{
    int id; // Could use thread::id, but this is filled before the thread is started
    std::thread t; // The thread object
    std::queue<job> jobs; // The job queue
    std::condition_variable cv; // The condition variable to wait for threads
    std::mutex m; // Mutex used for avoiding data races
    bool stop = false; // When set, this flag tells the thread that it should exit
};

// The thread function executed by each thread
void thread_func(thread_data* pData)
{
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
    while (true)
    {
        l.lock();

        // Wait until the queue won't be empty or stop is signaled
        pData->cv.wait(l, [pData] () {
            return (pData->stop || !pData->jobs.empty()); 
            });

        // Stop was signaled, let's exit the thread
        if (pData->stop) { return; }

        // Pop one task from the queue...
        job j = std::move(pData->jobs.front());
        pData->jobs.pop();

        l.unlock();

        // Execute the task!
        j();
    }
}

// Function that creates a simple task
job create_task(int id, int jobNumber)
{
    job j([id, jobNumber] ()
    {
        std::stringstream s;
        s << "Hello " << id << "." << jobNumber << std::endl;
        std::cout << s.str();
    });

    return j;
}

int main()
{
    const int numThreads = 4;
    const int numJobsPerThread = 10;
    std::vector<std::future<void>> futures;

    // Create all the threads (will be waiting for jobs)
    thread_data threads[numThreads];
    int tdi = 0;
    for (auto& td : threads)
    {
        td.id = tdi++;
        td.t = std::thread(thread_func, &td);
    }

    //=================================================
    // Start assigning jobs to each thread...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();


    //=================================================
    // Here the main thread does something...

    std::cin.get();

    // ...done!
    //=================================================


    //=================================================
    // Posts some new tasks...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();

    // Send stop signal to all threads and join them...
    for (auto& td : threads)
    {
        std::unique_lock<std::mutex> l(td.m);
        td.stop = true;
        td.cv.notify_one();
    }

    // Join all the threads
    for (auto& td : threads) { td.t.join(); }
}

 类似资料:
  • 问题内容: 我在自学Java线程时,发现有些令我困惑的地方。我做了一个叫做实现的课程。run方法仅打印“ Hello World”,休眠一秒钟,然后重复。 在我的主要方法中,我有: 如我所料,我看到了“ Hello World”和“ Done”。快速打印,这意味着main方法已到达末尾,但是我没想到的是,即使到达main末尾后,我开始运行的线程仍保持运行。 为什么即使退出主程序后程序仍继续执行?我

  • 问题内容: 我目前正在尝试诊断应用程序中的缓慢内存泄漏。到目前为止,我掌握的事实如下。 我有4天运行该应用程序的堆转储。 该堆转储包含约800个WeakReference对象,这些对象指向保留40mb内存的对象(所有对象都是同一类型,出于这个问题的目的,我将其称为Foo)。 Eclipse内存分析工具显示,这些WeakReferences引用的每个Foo对象均未被其他任何对象引用。我的期望是,这应

  • 方法有一个有趣的属性,它将允许其他线程在被阻止时进入其同步块。例如(假设线程1首先运行): 线程1: 线程2: 线程 2 能够唤醒线程 1 的事实意味着线程 2 进入了同步块,即使其他某个线程位于同一对象的同步块中也是如此。这对我来说很好,但我想知道这是否只发生在或所有会使线程“等待”的方法()上。在我的情况下,我关心,因为如果行为与相同,它会破坏我的代码: 那么,多个线程是否可能因为join调用

  • 我已经通过Firebase使用他们的API成功创建了一个动态链接,如下所示:https://firebase.google.com/docs/dynamic-links/rest#creating-a-short-dynamic-link.我想知道这些“动态生成”动态链接的寿命是多少?

  • 下面的代码创建了一个新的custom um < code > Thread ,并等待线程结束,直到主线程再次激活。 > < li >我不太明白它是如何工作的。为什么< code > myth read . wait();立即接到电话? < li> 为什么不改用< code>Thread.join()? 公共静态void main(String[] args) {