梳理caffe代码blocking_queue(十)

鲜于海
2023-12-01

这一个文件基本是我们最头疼的黑色地带,关于XXXX锁,XXXX解锁的问题,遇到的了学习学习,记不住多温习几次就可以。

首先抛开那些官方的条条框框,我们为了不让多个线程同时访问共享的资源是至关重要的。假如一个线程试图改变共享数据的值,而另外一个线程试图去读取该共享数据的值,结果将是未定义的。为了阻止这样的事情发生,需要用到一些非凡的原始数据类型和操作。其中最重的一个就是总所周知的mutex(“mutual exclusion”的缩写。译注:相互排斥的意思,经常被翻译为“互斥体”其实就是两个事件互斥,数学解释就是两个事件交集是空)。mutex在同一时间只能答应一个线程访问共享资源。当一个线程需要访问共享资源时,它必须先“锁住”mutex,假如任何其他线程已经锁住了mutex,那么本操作将会一直被阻塞,直到锁住了mutex的线程解锁,这就保证了共享资源,在同一时间,只有一个线程可以访问。

mutex的概念有几个变种。Boost.Threads支持两大类型的mutex:简单mutex和递归mutex。一个简单的mutex只能被锁住一次,假如同一线程试图两次锁定mutex,将会产生死锁。对于递归mutex,一个线程可以多次锁定一个mutex,但必须以同样的次数对mutex进行解锁,否则其他线程将无法锁定该mutex。

在上述两大类mutex的基础上,一个线程如何锁定一个mutex也有些不同变化。一个线程有3种可能方法来锁定mutex:

1. 等待并试图对mutex加锁,直到没有其他线程锁定mutex;

2. 试图对mutex加锁,并立即返回,假如其他线程锁定了mutex;

3. 等待并试图对mutex加锁,直到没有其他线程锁定mutex或者直到规定的时间已过。

看起来最好的mutex类型是递归的mutex了,因为上述3种加锁的方式它都支持。不过,不同的加锁方式有不同的消耗,因此对于特定的应用,Boost.Threads答应你挑选最有效率的mutex。为此,Boost.Threads提供了6中类型的mutex,效率由高到低排列:boost::mutex,boost::try_mutex,boost::timed_mutex,boost::recursive_mutex,

boost::recursive_try_mutex和boost::recursive_timed_mutex。(caffe中使用的就是效率最高的boost::mutex)。

假如一个线程锁定一个mutex后,而没有解锁,就会发生死锁,这也是最为常见的错误了,为此,Boost.Threads专门进行了设计,可不直接对mutex加锁或者解锁操作,以使这种错误不可能发生(或至少很难发生)。取而代之地,mutex类定义了内嵌的typedef来实现RAII(Resource Acquisition In Initialization,译注:在初始化时资源获得)[4]用以对一个mutex进行加锁或者解锁,这就是所谓的Scoped Lock模式。要构建一个这种类型的锁,需要传送一个mutex引用,构造函数将锁定mutex,析构函数将解锁mutex。C++语言规范确保了析构函数总是会被调用,所以即使有异常抛出,mutex也会被正确地解锁。这种模式确保了mutex的正确使用。不过必须清楚,尽管Scoped Lock模式保证了mutex被正确解锁,但它不能保证在有异常抛出的时候,所有共享资源任然处于有效的状态,所以,就像进行单线程编程一样,必须确保异常不会让程序处于不一致的状态。同时,锁对象不能传送给另外一个线程,因为他们所维护的状态不会受到此种用法的保护。

举boost::mutex类的一个简单的用法。其中两个线程被创建,每个循环10次,将id和当前循环计数输出到std::cout,main线程等待着两个线程结束。std::cout对象是一个共享资源,所以每个线程均使用全局mutex,以确保在同一时刻,只有一个线程输出到它。

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>
boost::mutex io_mutex;
struct count
{
count(int id) : id(id) { }
void operator()()
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << id << ": " << i << std::endl;
}
}
int id;
};
int main(int argc, char* argv[])
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();
return 0;
}
每次运行产生的结果顺序都不同,查了这么久才发现好笨啊,既然叫多线程有顺序还叫多线程么?哈哈。
然后看看cpp文件中boost::condition_variable 设计c++ 生产者消费者队列:

boost::condition_variable 用法:

当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程。消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下:

·当持有锁之后,线程调用wait

·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中

·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)

 线程被唤醒之后继续持有锁运行。

然后再看一下一段经典的设计:

template<typename Data> 
class concurrent_queue 
{ 
private: 
    std::queue<Data> the_queue; 
    mutable boost::mutex the_mutex; 
    boost::condition_variable the_condition_variable; 
public: 
    void push(Data const& data) 
    { 
        boost::mutex::scoped_lock lock(the_mutex); 
        the_queue.push(data); 
        lock.unlock(); 
        the_condition_variable.notify_one(); 
    } 
    bool empty() const 
    { 
        boost::mutex::scoped_lock lock(the_mutex); 
        return the_queue.empty(); 
    } 
    bool try_pop(Data& popped_value) 
    { 
        boost::mutex::scoped_lock lock(the_mutex); 
        if(the_queue.empty()) 
        { 
            return false; 
        } 
           
        popped_value=the_queue.front(); 
        the_queue.pop(); 
        return true; 
    } 
    void wait_and_pop(Data& popped_value) 
    { 
        boost::mutex::scoped_lock lock(the_mutex); 
        while(the_queue.empty()) 
        { 
            the_condition_variable.wait(lock); 
        } 
           
        popped_value=the_queue.front(); 
        the_queue.pop(); 
    } 
};
瞬间发现caffe的blocking_queue定义完全模仿这个模式设计的,哈哈
前面已经阐述的差不多了,直接看caffe的blocking_queue实现:

#include <boost/thread.hpp>
#include <string>

#include "caffe/data_reader.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/parallel.hpp"
#include "caffe/util/blocking_queue.hpp"

namespace caffe {
/*
empty()	堆栈为空则返回真
pop()	移除栈顶元素
push()	在栈顶增加元素
size()	返回栈中元素数目
top()	返回栈顶元素
*/
//首先尝试锁住,然后将数据push到队列(queue_ 是std::queue<T> 类型的),然后unlock,条件变量通知。
template<typename T>
class BlockingQueue<T>::sync {
 public:
  mutable boost::mutex mutex_;
  boost::condition_variable condition_;
};

template<typename T>
BlockingQueue<T>::BlockingQueue()
    : sync_(new sync()) {
}

template<typename T>
void BlockingQueue<T>::push(const T& t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);
  queue_.push(t);
  lock.unlock();
  sync_->condition_.notify_one();
}

template<typename T>
bool BlockingQueue<T>::try_pop(T* t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  if (queue_.empty()) {
    return false;
  }

  *t = queue_.front();
  queue_.pop();
  return true;
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  while (queue_.empty()) {
    if (!log_on_wait.empty()) {
      LOG_EVERY_N(INFO, 1000)<< log_on_wait;
    }
    sync_->condition_.wait(lock);// 如果队列一直为空则一直在等待  
  }

  T t = queue_.front();// 否则取出  
  queue_.pop();
  return t;
}
//判断队列首部是不是有数据
template<typename T>
bool BlockingQueue<T>::try_peek(T* t) {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  if (queue_.empty()) {
    return false;
  }

  *t = queue_.front();
  return true;
}
//peek函数取出队列首部的数据,同样也是使用的条件变量来实现同步
template<typename T>
T BlockingQueue<T>::peek() {
  boost::mutex::scoped_lock lock(sync_->mutex_);

  while (queue_.empty()) {
    sync_->condition_.wait(lock);
  }

  return queue_.front();
}

template<typename T>
size_t BlockingQueue<T>::size() const {
  boost::mutex::scoped_lock lock(sync_->mutex_);
  return queue_.size();
}

template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;

}  // namespace caffe

 类似资料: