当前位置: 首页 > 知识库问答 >
问题:

每个条件唤醒多个线程工作一次

赫连飞沉
2023-03-14
// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

std::mutex condMutex;
std::condition_variable condVar;
bool dataReady = false;

void state_change_worker(int id)
{
    while (1)
    {
        {
            std::unique_lock<std::mutex> lck(condMutex);
            condVar.wait(lck, [] { return dataReady; });
            // Do work only once.
            std::cout << "thread " << id << " working\n";
        }
    }
}

int main()
{
    // Create some worker threads.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
        threads[i] = std::thread(state_change_worker, i);

    while (1)
    {
        // Signal to the worker threads to work.
        {
            std::cout << "Notifying threads.\n";
            std::unique_lock<std::mutex> lck(condMutex);
            dataReady = true;
            condVar.notify_all();
        }
        // It would be really great if I could wait() on all of the 
        // worker threads being done with their work here, but it's 
        // not strictly necessary.
        std::cout << "Sleep for a bit.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}
// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

class SquadLock
{
public:
    void waitForLeader()
    {
        {
            // Increment count to show that we are waiting in queue.
            // Also, if we are the thread that reached the target, signal
            // to the leader that everything is ready.
            std::unique_lock<std::mutex> count_lock(count_mutex_);
            std::unique_lock<std::mutex> target_lock(target_mutex_);
            if (++count_ >= target_)
                count_cond_.notify_one();
        }
        // Wait for leader to signal done.
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_cond_.wait(lck, [&] { return done_; });
        {
            // Decrement count to show that we are no longer waiting.
            // If we are the last thread set done to false.
            std::unique_lock<std::mutex> lck(count_mutex_);
            if (--count_ == 0)
            {
                done_ = false;
            }
        }
    }

    void waitForHerd()
    {
        std::unique_lock<std::mutex> lck(count_mutex_);
        count_cond_.wait(lck, [&] { return count_ >= target_; });
    }
    void leaderDone()
    {
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_ = true;
        done_cond_.notify_all();
    }
    void incrementTarget()
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        ++target_;
    }
    void decrementTarget()
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        --target_;
    }
    void setTarget(int target)
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        target_ = target;
    }

private:
    // Condition variable to indicate that the leader is done.
    std::mutex done_mutex_;
    std::condition_variable done_cond_;
    bool done_ = false;

    // Count of currently waiting tasks.
    std::mutex count_mutex_;
    std::condition_variable count_cond_;
    int count_ = 0;

    // Target number of tasks ready for the leader.
    std::mutex target_mutex_;
    int target_ = 0;
};

SquadLock squad_lock;
std::mutex print_mutex;
void state_change_worker(int id)
{
    while (1)
    {
        // Wait for the leader to signal that we are ready to work.
        squad_lock.waitForLeader();
        {
            // Adding just a bit of sleep here makes it so that every thread wakes up, but that isn't the right way.
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::unique_lock<std::mutex> lck(print_mutex);
            std::cout << "thread " << id << " working\n";
        }
    }
}

int main()
{

    // Create some worker threads and increment target for each one
    // since we want to wait until all threads are finished.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
    {
        squad_lock.incrementTarget();
        threads[i] = std::thread(state_change_worker, i);
    }
    while (1)
    {
        // Signal to the worker threads to work.
        std::cout << "Starting threads.\n";
        squad_lock.leaderDone();
        // Wait for the worked threads to be done.
        squad_lock.waitForHerd();
        // Wait until next time, processing results.
        std::cout << "Tasks done, waiting for next time.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

共有1个答案

淳于俊迈
2023-03-14

下面是我创建的一个关于并发设计模式的博客的节选。模式是使用Ada语言表达的,但概念是可以翻译到C++的。

摘要

监视器

有几种创建和控制共享内存的理论方法。其中最灵活和最健壮的是监视器,如C.A.R.首先描述的。霍尔。监视器是一个具有三种不同操作的数据对象。

班锁

班组锁定允许特殊任务(班长)监视一群或一组工人任务的进度。当所有(或足够数量的)工作者任务都完成了他们工作的某个方面,并且领导者准备继续工作时,整个任务集就可以通过一个障碍,继续他们的下一个活动序列。其目的是允许任务异步执行,同时通过一组复杂的活动来协调它们的进度。

package Barriers is
   protected type Barrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
      procedure Leader_Done; 
   private
      Done : Boolean := False;
   end Barrier;

   protected type Autobarrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
   private
      Done : Boolean := False;
   end Autobarrier;
end Barriers;

这个包显示了两种班锁。屏障保护类型演示了一个基本的小队锁定。herd调用Wait_For_Leader,leader调用Wait_For_Herd,然后调用leader_done。Autobarrier演示了一个更简单的界面。herd调用Wait_For_Leader,leader调用wait_for_herd。在创建任何一种类型的屏障的实例时都使用触发器参数。它设置了在进行之前领导者必须等待的羊群任务的最小数量。

package body Barriers is
   protected body Barrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         null;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;

      procedure Leader_Done is
      begin
         Done := True;
      end Leader_Done;
   end Barrier;

   protected body Autobarrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         Done := True;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;
   end Autobarrier;
end Barriers;
 类似资料:
  • 问题内容: 想象一下,您在Java中有一个典型的生产者- 消费者模式。为了提高效率,您要使用而不是在将新元素添加到队列时使用。如果两个生产者线程调用notify,是否保证将唤醒两个不同的正在等待的使用者线程?还是彼此之间很快触发的两个s导致同一用户线程排队两次唤醒?我找不到该部分描述此工作原理的API。Java是否有一些原子内部操作可仅一次唤醒线程? 如果仅一个消费者正在等待,则第二个通知将丢失,

  • 假设10个进程使用sem_wait(). 等待一个信号量,第11个进程调用该信号量的sem_post。 10个进程中哪一个将进入临界块? 像随机吗?所有进程将被唤醒并努力实现锁。 ,CPU将为其中一个进程提供锁,其余的进程将返回等待状态

  • 我有2个工作线程和1个处理线程。 当处理线程正在尝试处理某些事情,而辅助线程正在执行它们的工作时,处理线程应该等待,并且在辅助线程中执行的所有作业完成时唤醒。 我怎样才能唤醒这根线?我将尝试演示我在这段伪代码中的意思 处理线程类似于 这样的事情可能发生吗?让线程等待到多个调用notifyAll()的源,而不是只等待一次。我希望我把这个问题弄清楚了。 多谢帮忙!

  • 问题内容: 我有以下方法: 从视图中调用它: 如您所见,_attempt方法期望actor为type ,但是该对象似乎为type 。为什么会这样呢?更重要的是,如何将(显然是User对象的一种包装器)转换为对象? 有关更多信息,请参见:https : //docs.djangoproject.com/en/dev/ref/request- response/#django.http.HttpReq

  • 我正在创建一个简单的web代理应用程序使用Java。基本上,main方法创建一个RequestReceiver对象,该对象具有侦听web浏览器http请求的ServerSocket。从ServerSocket.Accept()返回的套接字创建一个新的连接对象,并将其放入线程池中。

  • 我想做一个小练习来习惯等待/通知。我想做的是简单地启动一个线程,然后用等待让它进入睡眠状态,用通知唤醒它,多次。 我的代码是: 我希望这会是这样 相反,这样做: 所以。。。通知似乎没有唤醒打印机线程? 这不应该是一个死锁,因为通过等待,我释放了所有的锁,所以主服务器不应该有任何对打印机的锁,打印机应该能够唤醒并打印。 我做错了什么?