一、前言
我的理解是在CAFFE中,数据的产生即从存储系统中读取出来和数据的消耗即被读入数据层进行计算必然是异步的,也就是生产者和消费者模型。所以才有了双阻塞队列来实现数据在数据层与存储系统之间的同步。
为什么使用队列?
因为图像数据只需要按照固定的顺序一步一步输入即可,不存在随机访问,不存在随机写入,同时queue底层是由链式表构成的,其访问速度不会随着元素的增加而增加。
如何实现多线程下的阻塞?
使用互斥锁和条件变量。
双阻塞队列将在预取层中梳理。
二、源码分析
1、成员变量
说明该类是对STL的queue的封装。
std::queue<T> queue_;
shared_ptr<sync> sync_;
2、互斥锁和条件变量
template<typename T>
class BlockingQueue<T>::sync {
public:
mutable boost::mutex mutex_;#互斥锁
boost::condition_variable condition_; #条件变量
};
互斥锁的使用:
boost::mutex::scoped_lock提供局部锁定的功能,类似与智能指针,在离开作用域时立即解锁。
template<typename T>
bool BlockingQueue<T>::try_pop(T* t) {
boost::mutex::scoped_lock lock(sync_->mutex_);
if (queue_.empty()) {
return false;
}
*t = queue_.front();
queue_.pop();
return true;
}
条件变量的使用:
当队列已经空了之后:
sync_->condition_.wait(lock)表示使用当前mutex标记,交出了CPU的控制权
template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait) {
boost::mutex::scoped_lock lock(sync_->mutex_);
while (queue_.empty()) {
if (!log_on_wait.empty()) {
LOG_EVERY_N(INFO, 1000)<< log_on_wait;
}
sync_->condition_.wait(lock);
}
T t = queue_.front();
queue_.pop();
return t;
}
push操作将t压入队列,sync_->condition_.notify_one()激活了上面的线程的CPU控制权,继续pop操作。
template<typename T>
void BlockingQueue<T>::push(const T& t) {
boost::mutex::scoped_lock lock(sync_->mutex_);
queue_.push(t);
lock.unlock();
sync_->condition_.notify_one();
}
三、总结
在STL的queue的基础上,加上互斥锁和条件变量实现了多线程下的阻塞队列。