当前位置: 首页 > 软件库 > 程序开发 > 常用工具包 >

MPMCQueue

有界多生产者多用户无锁队列
授权协议 MIT
开发语言 C/C++
所属分类 程序开发、 常用工具包
软件类型 开源软件
地区 不详
投 递 者 慕逸仙
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

MPMCQueue 是一个用C ++ 11编写的有界多生产者多用户无锁队列。

示例代码

MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();

使用

  • MPMCQueue<T>(size_t capacity);

    Constructs a new MPMCQueue holding items of type T with capacity capacity.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • bool push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> bool push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Returns true on success and false if queue is full.

  • void pop(T &v);

    Dequeue an item by copying or moving the item into v. Blocks if queue is empty.

  • bool try_pop(T &v);

    Try to dequeue an item by copying or moving the item into v. Return true on sucess and false if the queue is empty.

所有操作都是线程安全的,除了构造和析构函数。

实际原理

Memory layout

Enqeue:

  1. Acquire next write ticket from head.
  2. Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the readers we are done writing.

Dequeue:

  1. Acquire next read ticket from tail.
  2. Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the writers we are done reading.

参考资料:

  • 知乎原文:一个Wait-Free MPMC队列的实现 昨天在wait-free是指什么的评论中,我和朱元兄对wait-free的MPMC(多生产者多消费者)队列进行了一番探讨,也激发了我对已有的wait-free思路进行更深入的挖掘,结果发现MPMC队列有可能做到完全wait-free。于是,本着"shut up and show me the code"的原则,我尝试实现了一个MPMC队列,经过

 相关资料
  • 目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不

  • 问题内容: 在进行工作之前,我一直遵循一种检查通道中是否有东西的模式: 这是基于这部视频的。这是我的完整代码: 如果您尝试运行它,则在打印将要中断的消息之前,我们最终将陷入僵局。自上次以来,当chan中没有其他内容时,tbh才有意义,因此我们试图拉出该值,因此出现此错误。但是这样的模式是不可行的。我如何使此代码起作用以及为什么会出现此死锁错误(大概此模式应该起作用?)。 问题答案: 鉴于您在一个频

  • 问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola

  • 我创建了两个apache camel(blueprint XML)kafka项目,一个是kafka-producer(接受请求并将其存储在kafka服务器中),另一个是kafka-consumer(从kafka服务器获取ups消息并处理它们)。 这个设置对单个主题和单个消费者都很有效。然而,我如何在同一个Kafka主题中创建单独的消费者组?如何在不同的消费者群体中路由同一主题中的多个消费者特定消息