线程安全队列 java_C++ 如何用百行代码实现线程安全的并发队列 | concurrent queue or blocking queue implemented in cpp...

曹建明
2023-12-01

concurrent queue or blocking queue implemented in cpp

Guide

introduction

Where produce-consumer pattern is present it is often the case that one is faster that the other:

a parsing producer reads records faster than a processing consumer;

a disk reading producer is faster than network sending consumer.

Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty?

One approach of the producer is to try to put an item on a queue and if it’s full yield the thread and repeat. Similarly the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.

Another approach is to temporarily grow the queue, but that doesn’t scale well. When do we stop growing? And once we stop we have to fall back onto the try-fail-yield method.

What if we could implement a blocking queue:

a queue who’s put operation blocks when the queue if full, and unblocks only when another thread pops an item off the queue

Similarly a queue who’s pop operation blocks when the queue is empty, and unblocks only when another thread puts an item on the queue.

Quote from here

An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

blocking queue v1

//std

#include

//boost

#include

#include

#include

namespace my {

namespace algorithm {

template

class SHARED_EXPORT blocking_queue

{

private:

std::queue the_queue;

mutable boost::mutex the_mutex;

boost::condition_variable the_condition_variable;

public:

void push(Data const& data)

{

boost::mutex::scoped_lock lock(the_mutex);

the_queue.push(data);

lock.unlock();

the_condition_variable.notify_one();

}

bool empty() const

{

boost::mutex::scoped_lock lock(the_mutex);

return the_queue.empty();

}

size_t size() const

{

boost::mutex::scoped_lock lock(the_mutex);

return the_queue.size();

}

bool try_pop(Data& popped_value)

{

boost::mutex::scoped_lock lock(the_mutex);

if (the_queue.empty())

{

return false;

}

popped_value = the_queue.front();

the_queue.pop();

return true;

}

void wait_and_pop(Data& popped_value)

{

boost::mutex::scoped_lock lock(the_mutex);

while (the_queue.empty())

{

the_condition_variable.wait(lock);

}

popped_value = the_queue.front();

the_queue.pop();

}

void signal_exit()

{

Data data;

push(data);

}

};

}

}// end namespace

blocking queue v2

#pragma once

#include

#include

#include

#include

#include

#define MAX_CAPACITY 20

namespace my {

namespace algorithm {

template

class SHARED_EXPORT BlockingQueue

{

public:

BlockingQueue()

:mtx(), full_(), empty_(), capacity_(MAX_CAPACITY) { }

void Push(const T& data){

std::unique_lock<:mutex> lock(mtx);

while(queue_.size() == capacity_){

full_.wait(lock );

}

assert(queue_.size() < capacity_);

queue_.push(data);

empty_.notify_all();

}

T Pop(){

std::unique_lock<:mutex> lock(mtx);

while(queue_.empty()){

empty_.wait(lock );

}

assert(!queue_.empty());

T front(queue_.front());

queue_.pop();

full_.notify_all();

return front;

}

T Front(){

std::unique_lock<:mutex> lock(mtx);

while(queue_.empty()){

empty_.wait(lock );

}

assert(!queue_.empty());

T front(queue_.front());

return front;

}

T Back(){

std::unique_lock<:mutex> lock(mtx);

while(queue_.empty()){

empty_.wait(lock );

}

assert(!queue_.empty());

T back(queue_.back());

return back;

}

size_t Size(){

std::lock_guard<:mutex> lock(mtx);

return queue_.size();

}

bool Empty(){

std::unique_lock<:mutex> lock(mtx);

return queue_.empty();

}

void SetCapacity(const size_t capacity){

capacity_ = (capacity > 0 ? capacity : MAX_CAPACITY);

}

private:

//DISABLE_COPY_AND_ASSIGN(BlockingQueue);

BlockingQueue(const BlockingQueue& rhs);

BlockingQueue& operator= (const BlockingQueue& rhs);

private:

mutable std::mutex mtx;

std::condition_variable full_;

std::condition_variable empty_;

std::queue queue_;

size_t capacity_;

};

}

}// end namespace

Reference

History

20191012: created.

Copyright

Post author: kezunlin

Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

 类似资料: