C 如何用百行代码实现线程安全的并发队列 | concurrent queue or blocking queue implemented in cpp

廖夜洛
2023-12-01

本文首发于个人博客https://kezunlin.me/post/cabccf5c/,欢迎阅读最新内容!

concurrent queue or blocking queue implemented in cpp

Guide

introduction

Where produce-consumer pattern is present it is often the case that one is faster that the other:

  • a parsing producer reads records faster than a processing consumer;
  • a disk reading producer is faster than network sending consumer.

Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty?

One approach of the producer is to try to put an item on a queue and if it’s full yield the thread and repeat. Similarly the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.

Another approach is to temporarily grow the queue, but that doesn’t scale well. When do we stop growing? And once we stop we have to fall back onto the try-fail-yield method.

What if we could implement a blocking queue:

  • a queue who’s put operation blocks when the queue if full, and unblocks only when another thread pops an item off the queue
  • Similarly a queue who’s pop operation blocks when the queue is empty, and unblocks only when another thread puts an item on the queue.

Quote from here

An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

blocking queue v1

//std
#include <queue>

//boost
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

namespace my { 
namespace algorithm {

template<typename Data>
class SHARED_EXPORT blocking_queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;

public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    size_t size() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.size();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if (the_queue.empty())
        {
            return false;
        }

        popped_value = the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while (the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }

        popped_value = the_queue.front();
        the_queue.pop();
    }

    void signal_exit()
    {
        Data data;
        push(data);
    }

};

}
}// end namespace

<script async src="https://pagead2.googlesyndication.com/pagead/js/adsbygoogle.js"></script> <script>(adsbygoogle = window.adsbygoogle || []).push({});</script>

blocking queue v2


#pragma once
#include <iostream>
#include <assert.h>    

#include <queue>
#include <mutex>
#include <condition_variable>

#define MAX_CAPACITY 20

namespace my {
namespace algorithm {

template<typename T>
class SHARED_EXPORT BlockingQueue
{
public:
    BlockingQueue() 
    :mtx(), full_(), empty_(), capacity_(MAX_CAPACITY) { }


    void Push(const T& data){
        std::unique_lock<std::mutex> lock(mtx);
        while(queue_.size() == capacity_){
            full_.wait(lock );
        }

        assert(queue_.size() < capacity_);
        queue_.push(data);
        empty_.notify_all(); 
    }

    T Pop(){
        std::unique_lock<std::mutex> lock(mtx);
        while(queue_.empty()){
            empty_.wait(lock );
        }

        assert(!queue_.empty());
        T front(queue_.front());
        queue_.pop();
        full_.notify_all();
        return front;
    }

    T Front(){
        std::unique_lock<std::mutex> lock(mtx);
        while(queue_.empty()){
            empty_.wait(lock );
        }

        assert(!queue_.empty());
        T front(queue_.front());
        return front;
    }

    T Back(){
        std::unique_lock<std::mutex> lock(mtx);
        while(queue_.empty()){
            empty_.wait(lock );
        }

        assert(!queue_.empty());
        T back(queue_.back());
        return back;
    }

    size_t Size(){
        std::lock_guard<std::mutex> lock(mtx);
        return queue_.size();
    }

    bool Empty(){
        std::unique_lock<std::mutex> lock(mtx);
        return queue_.empty();
    }

    void SetCapacity(const size_t capacity){
        capacity_ = (capacity > 0 ? capacity : MAX_CAPACITY);
    }

private:
    //DISABLE_COPY_AND_ASSIGN(BlockingQueue);
    BlockingQueue(const BlockingQueue& rhs);
    BlockingQueue& operator= (const BlockingQueue& rhs);

private:
    mutable std::mutex mtx;
    std::condition_variable full_;
    std::condition_variable empty_;
    std::queue<T> queue_;
    size_t capacity_; 
};


}
}// end namespace

Reference

History

  • 20191012: created.

Copyright

 类似资料: