当前位置: 首页 > 知识库问答 >
问题:

C中的共享队列

颛孙兴旺
2023-03-14

我只是简单地从网络获取数据包,并在一个线程中将它们排队,然后在另一个线程中使用此数据包(取消排队)。

所以我决定使用boost库来创建一个基于https://www.quantnet.com/cplusplus-multithreading-boost/

template <typename T>
class SynchronisedQueue
{
private:
    std::queue<T> m_queue;  // Use STL queue to store data
    boost::mutex m_mutex;   // The mutex to synchronise on
    boost::condition_variable m_cond;// The condition to wait for

public:

    // Add data to the queue and notify others
    void Enqueue(const T& data)
    {
        // Acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(m_mutex);

        // Add the data to the queue
        m_queue.push(data);

        // Notify others that data is ready
        m_cond.notify_one();

    } // Lock is automatically released here

    // Get data from the queue. Wait for data if not available
    T Dequeue()
    {

        // Acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, wait till someone fills it.
        // Lock is automatically released in the wait and obtained 
        // again after the wait
        while (m_queue.size()==0) m_cond.wait(lock);

        // Retrieve the data from the queue
        T result=m_queue.front(); m_queue.pop();
        return result;

    } // Lock is automatically released here
};

问题是,虽然没有得到任何数据,但Dequeue()方法会阻塞我的使用者线程,当我想终止使用者线程时,有时我无法结束或停止它。

结束Dequeue()阻塞的建议方法是什么,以便我可以html" target="_blank">安全地终止使用数据包的线程?有什么想法建议吗?

PS:该网站 https://www.quantnet.com/cplusplus-multithreading-boost/ 使用“boost::this_thread::interruption_point();”来停止消费线程...由于我的遗留代码结构,这对我来说是不可能的......

根据答案I更新共享队列,如下所示:

#include <queue>
 #include <boost/thread.hpp>  

template <typename T>
class SynchronisedQueue
{
public:

    SynchronisedQueue()
    {
        RequestToEnd = false;  
        EnqueueData = true;
    }
    void Enqueue(const T& data)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);

        if(EnqueueData)
        {
            m_queue.push(data);
            m_cond.notify_one();
        }

    } 


    bool TryDequeue(T& result)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);

        while (m_queue.empty() && (! RequestToEnd)) 
        { 
            m_cond.wait(lock);
        }

        if( RequestToEnd )
        {
             DoEndActions();
             return false;
        }

        result= m_queue.front(); m_queue.pop();

        return true;
    }

    void StopQueue()
    {
        RequestToEnd =  true;
        Enqueue(NULL);        
    }

    int Size()
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);
        return m_queue.size();

    }

private:

    void DoEndActions()
    {
        EnqueueData = false;

        while (!m_queue.empty())  
        {
            m_queue.pop();
        }
    }



    std::queue<T> m_queue;              // Use STL queue to store data
    boost::mutex m_mutex;               // The mutex to synchronise on
    boost::condition_variable m_cond;            // The condition to wait for

    bool RequestToEnd;
    bool EnqueueData;
};

这是我的试驾:

#include <iostream>
#include <string>

#include "SynchronisedQueue.h"

using namespace std;

SynchronisedQueue<int> MyQueue;

void InsertToQueue()
{
    int i= 0;

    while(true)
    {
        MyQueue.Enqueue(++i);
    }

}

void ConsumeFromQueue()
{
    while(true)
    {
        int number;

        cout << "Now try to dequeue" << endl;

        bool success = MyQueue.TryDequeue(number);

        if(success)
        {

            cout << "value is " << number << endl;

        }

        else
        {
            cout << " queue is stopped" << endl;
            break;

        }
    }


    cout << "Que size is : " << MyQueue.Size() <<  endl;
}



int main()
{

    cout << "Test Started" << endl;

    boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
    boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);

    boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds

    MyQueue.StopQueue();

    int endMain;

    cin >> endMain;


    return 0;
}

目前看来这行得通…基于新的建议:

我将停止方法更改为:

void StopQueue()
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);
        RequestToEnd =  true;
        m_cond.notify_one();          
    }

共有3个答案

仉刚洁
2023-03-14

另一个应该考虑的选择是不要在线程中无限阻塞。换句话说,为您的阻塞调用添加一个超时,如下所示:

    bool TryDequeue(T& result, boost::chrono::milliseconds timeout)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);
        boost::chrono::system_clock::time_point timeLimit = 
            boost::chrono::system_clock::now() + timeout;

        while (m_queue.empty())
        { 
            if (m_cond.wait_until(lock, timeLimit) == 
                boost::condition_variable::cv_status::timeout)
            {
                return false;
            }
        }

        result = m_queue.front(); m_queue.pop();

        return true;
    }

然后在您的线程中,只需有一个变量来指示线程是否仍在运行(我冒昧地将您的使用者设置为一个类):

class Consumer
{
public:
    boost::shared_ptr<Consumer> createConsumer()
    {
        boost::shared_ptr<Consumer> ret(new Consumer());
        ret->_consumeFromQueue = boost::thread(&Consumer::ConsumeFromQueue, ret.get());
        return ret;
    }
protected:
    Consumer()
    : _threadRunning(true)
    {
    }

    ~Consumer()
    {
        _threadRunning = false;
        _consumeFromQueue.join();
    }

    void ConsumeFromQueue()
    {
        while(_threadRunning == true)
        {
            int number;

            cout << "Now try to dequeue" << endl;

            bool success = MyQueue.TryDequeue(number);

            if(success)
            {

                cout << "value is " << number << endl;

            }

            else
            {
                cout << " queue is stopped" << endl;
                break;

            }
        }


        cout << "Que size is : " << MyQueue.Size() <<  endl;
    }

    bool _threadRunning;
    boost::thread _consumeFromQueue;
}

没有必要破解你的队列类,只是为了它可以在线程中使用,给它一个有超时的正常接口,然后根据用例以正确的方式使用它。

我在这里提供了有关为什么这是线程遵循的良好模式的更多详细信息:

http://blog.chrisd.info/how-to-run-threads/

邹星火
2023-03-14

首先,你真的需要终止线程吗?如果没有,请不要。

如果你真的需要,那就把它列为自杀药丸。我通常向T发送一个NULL转换。线程检查T,如果为NULL,则进行清理,返回,然后终止。

此外,您可能需要首先通过移除和删除()所有项来清除队列。

宋臻
2023-03-14

让线结束的两个简单方法:

>

  • 在队列上发送结束消息
  • 将另一个条件添加到条件变量以命令结束

    while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock);
    if (RequestToEnd) { doEndActions(); }
    else { T result=m_queue.front(); m_queue.pop(); return result; }
    

  •  类似资料:
    • 哪里使用无锁数据结构更好,哪里使用基于'mutex'和'condition_variables'的简单实现更好?

    • 问题内容: 我知道如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。 但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识。 当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多

    • 问题内容: 我正在从python脚本中调用一个so文件。据我了解,我真的不需要释放使用ctypes在python中打开的共享库。但是,在我的so文件代码中,它dlopen另一个so文件并且不执行dlclose()。在这种情况下,从python端使用安全吗?我不必释放在ctypes内部加载的共享库soe文件吗? 问题答案: 始终遵循 “自己清洁后清理 ”的规则(尽管现代技术会为您提供清洁方面的帮助)

    • 我在“C编程语言,第4版”第17.5.1.3章中找到了下面的代码 我不明白最后一条评论,确实y.p应该在reset()调用后指向一个新的内存地址,所以 应该让y.p不变,不是吗? 谢谢

    • 问题内容: 这是使用g ++ 进行动态共享库编译的后续版本。 我正在尝试在Linux上的C++中创建一个共享的类库。当我尝试使用库中定义的类时,我的问题开始了。我链接到的第二篇教程展示了如何加载用于创建库中定义的类的对象的符号,但是没有_使用_ 这些对象来完成任何工作。 有谁知道用于创建共享C ++类库的更完整的教程,该教程还显示了如何在单独的可执行文件中 使用 这些类?一个非常简单的教程,显示了

    • 问题内容: 是否可以使用Go创建共享库(.so)? 更新 :为此创建了一个“ 问题 ”。 问题答案: 现在可以使用标志 您需要做的是首先运行以下命令: (以上代码使所有通用软件包都可共享!)然后 最后,在编译代码时,您需要运行: 上面这些就是什么,而不是静态链接所有内容而仅动态链接它们,您最终将获得更小的编译文件。为了让您了解我的带有静态链接的“ hello.go”文件为2.3MB,而使用动态链接