当前位置: 首页 > 知识库问答 >
问题:

尝试实现无锁队列时堆栈溢出

鲁鹏
2023-03-14


我基于Maged M.Michael和Michael L.Scott的《简单、快速、实用的非阻塞和阻塞并发队列算法》中指定的算法实现了一个无锁队列(有关算法,请跳到第4页)

我在shared_ptr上使用了原子操作,如std::atomic_load_explicit等。

当只在一个线程中使用队列时,一切都很好,但是当从不同的线程中使用它时,我会得到堆栈溢出异常。

不幸的是,我无法追查问题的根源。似乎当一个<code>shared_ptr</code>超出范围时,它会减少下一个<code>ConcurrentQueueNode</code>上的引用数,并导致无限递归,但我不明白为什么。。

代码:

队列节点:

template<class T>
struct ConcurrentQueueNode {
    T m_Data;
    std::shared_ptr<ConcurrentQueueNode> m_Next;

    template<class ... Args>
    ConcurrentQueueNode(Args&& ... args) :
        m_Data(std::forward<Args>(args)...) {}

    std::shared_ptr<ConcurrentQueueNode>& getNext() {
        return m_Next;
    }

    T getValue() {
        return std::move(m_Data);
    }

};

并发队列(注意:不适合胆小的人):

template<class T>
class ConcurrentQueue {
    std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail;

public:

ConcurrentQueue(){
    m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>();
}

template<class ... Args>
void push(Args&& ... args) {
    auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...);
    std::shared_ptr<ConcurrentQueueNode<T>> tail;

    for (;;) {
        tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire);
        std::shared_ptr<ConcurrentQueueNode<T>> next = 
            std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire);

        if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) {
            if (next.get() == nullptr) {
                auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext();
                auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node);
                if (res) {
                    break;
                }
            }
            else {
                std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
            }
        }
    }

    std::atomic_compare_exchange_strong(&m_Tail, &tail, node);
}

bool tryPop(T& dest) {
    std::shared_ptr<ConcurrentQueueNode<T>> head;
    for (;;) {
        head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire);
        auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire);
        auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire);

        if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) {
            if (head.get() == tail.get()) {
                if (next.get() == nullptr) {
                    return false;
                }
                std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
            }
            else {
                dest = next->getValue();
                auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next);
                if (res) {
                    break;
                }
            }
        }
    }

    return true;
}
};

重现问题的用法示例:

int main(){
    ConcurrentQueue<int> queue;
    std::thread threads[4];

for (auto& thread : threads) {
    thread = std::thread([&queue] {

        for (auto i = 0; i < 100'000; i++) {
            queue.push(i);
            int y;
            queue.tryPop(y);
        }
    });
}

for (auto& thread : threads) {
    thread.join();
}
return 0;
}

共有1个答案

尹承泽
2023-03-14

问题是竞争条件会导致队列中的每个节点都等待被一次性释放——这是递归的,会破坏堆栈。

如果您将测试更改为仅使用一个线程但不弹出,则每次都会遇到相同的堆栈溢出错误。

for (auto i = 1; i < 100000; i++) {
  queue.push(i);
  //int y;
  //queue.tryPop(y);
}

您需要取消递归删除节点链:

__forceinline ~ConcurrentQueueNode() {
    if (!m_Next || m_Next.use_count() > 1)
        return;
    KillChainOfDeath();
}
void KillChainOfDeath() {
    auto pThis = this;
    std::shared_ptr<ConcurrentQueueNode> Next, Prev;
    while (1) {
        if (pThis->m_Next.use_count() > 1)
          break;
        Next.swap(pThis->m_Next); // unwire node
        Prev = NULL; // free previous node that we unwired in previous loop
        if (!(pThis = Next.get())) // move to next node
            break;
        Prev.swap(Next); // else Next.swap will free before unwire.
    }
}

我以前从未使用过shared_ptr,所以我不知道是否有更快的方法可以做到这一点。此外,由于我以前从未使用过shared_ptr,我不知道您的算法是否会遇到ABA问题。除非shared\u ptr实现中有什么特殊的东西来防止ABA,否则我担心以前释放的节点可能会被重用,从而欺骗CAS。我运行你的代码时似乎从未遇到过这个问题。

 类似资料:
  • 我读了一些关于如何优化正则表达式的文章,但是没有一个答案(更少的组,使用{X,Y}而不是*)似乎阻止了我的正则表达式获得堆栈溢出错误。 我正在尝试通过文件进行动态搜索。假设我正在一个相当大(2-4MB)的文件中搜索“我打赌你找不到我”。我的正则表达式生成器将生成正则表达式: 这个正则表达式的想法是,无论单词之间有什么字符或空格,它都能找到准确的短语。然而,当我尝试使用: 我遇到堆栈溢出错误。我知道

  • 问题内容: 您可以尝试/捕获Java中的堆栈溢出异常吗?它似乎在向任一方向投掷自己。当程序溢出时,我想“惩罚”该值。 问题答案: 似乎可以工作:

  • 数字键盘字母组合问题[M]

  • 本文向大家介绍JS实现队列与堆栈的方法,包括了JS实现队列与堆栈的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JS实现队列与堆栈的方法。分享给大家供大家参考,具体如下: 在面向对象的程序设计里,一般都提供了实现队列(queue)和堆栈(stack)的方法,而对于JS来说,我们可以实现数组的相关操作,来实现队列和堆栈的功能,看下面的相关介绍. 一、看一下它们的性质,这种性质决定了它们

  • 本文向大家介绍python实现堆栈与队列的方法,包括了python实现堆栈与队列的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python实现堆栈与队列的方法。分享给大家供大家参考。具体分析如下: 1、python实现堆栈,可先将Stack类写入文件stack.py,在其它程序文件中使用from stack import Stack,然后就可以使用堆栈了。 stack.py的程序:

  • 问题内容: 在JavaScript中实现堆栈和队列的最佳方法是什么? 我正在寻找shunting-yard算法,并且我将需要这些数据结构。 问题答案: var stack = []; stack.push(2); // stack is now [2] stack.push(5); // stack is now [2, 5] var i = stack.pop(); // stack is no