MPMCQueue 是一个用C ++ 11编写的有界多生产者多用户无锁队列。
MPMCQueue<int> q(10); auto t1 = std::thread([&] { int v; q.pop(v); std::cout << "t1 " << v << "\n"; }); auto t2 = std::thread([&] { int v; q.pop(v); std::cout << "t2 " << v << "\n"; }); q.push(1); q.push(2); t1.join(); t2.join();
MPMCQueue<T>(size_t capacity);
Constructs a new MPMCQueue
holding items of type T
with capacity capacity
.
void emplace(Args &&... args);
Enqueue an item using inplace construction. Blocks if queue is full.
bool try_emplace(Args &&... args);
Try to enqueue an item using inplace construction. Returns true
on success and false
if queue is full.
bool push(const T &v);
Enqueue an item using copy construction. Blocks if queue is full.
template <typename P> bool push(P &&v);
Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true
. Blocks if queue is full.
bool try_push(const T &v);
Try to enqueue an item using copy construction. Returns true
on success and false
if queue is full.
template <typename P> bool try_push(P &&v);
Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true
. Returns true
on success and false
if queue is full.
void pop(T &v);
Dequeue an item by copying or moving the item into v
. Blocks if queue is empty.
bool try_pop(T &v);
Try to dequeue an item by copying or moving the item into v
. Return true
on sucess and false
if the queue is empty.
所有操作都是线程安全的,除了构造和析构函数。
Enqeue:
Dequeue:
参考资料:
知乎原文:一个Wait-Free MPMC队列的实现 昨天在wait-free是指什么的评论中,我和朱元兄对wait-free的MPMC(多生产者多消费者)队列进行了一番探讨,也激发了我对已有的wait-free思路进行更深入的挖掘,结果发现MPMC队列有可能做到完全wait-free。于是,本着"shut up and show me the code"的原则,我尝试实现了一个MPMC队列,经过
目前我们有LinkedBlockingQueue和Con的LinkedQueue。 LinkedBlockingQueue可以有界,但它使用锁。 ConcurrentLinkedQueue不使用锁,但它不受限制。而这并不是阻碍投票的原因。 显然,我不能有一个既阻塞又无锁的队列(无等待或非阻塞或其他东西)。我不要求学术定义。 有人知道一个队列实现,它基本上是无锁的(不在热路径中使用锁),空时阻塞(不
问题内容: 在进行工作之前,我一直遵循一种检查通道中是否有东西的模式: 这是基于这部视频的。这是我的完整代码: 如果您尝试运行它,则在打印将要中断的消息之前,我们最终将陷入僵局。自上次以来,当chan中没有其他内容时,tbh才有意义,因此我们试图拉出该值,因此出现此错误。但是这样的模式是不可行的。我如何使此代码起作用以及为什么会出现此死锁错误(大概此模式应该起作用?)。 问题答案: 鉴于您在一个频
问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别
问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola
我创建了两个apache camel(blueprint XML)kafka项目,一个是kafka-producer(接受请求并将其存储在kafka服务器中),另一个是kafka-consumer(从kafka服务器获取ups消息并处理它们)。 这个设置对单个主题和单个消费者都很有效。然而,我如何在同一个Kafka主题中创建单独的消费者组?如何在不同的消费者群体中路由同一主题中的多个消费者特定消息