当前位置: 首页 > 工具软件 > MPMCQueue > 使用案例 >

一个Wait-Free MPMC队列的实现

云捷
2023-12-01

知乎原文:一个Wait-Free MPMC队列的实现

昨天在wait-free是指什么的评论中,我和朱元兄对wait-free的MPMC(多生产者多消费者)队列进行了一番探讨,也激发了我对已有的wait-free思路进行更深入的挖掘,结果发现MPMC队列有可能做到完全wait-free。于是,本着"shut up and show me the code"的原则,我尝试实现了一个MPMC队列,经过简单的测试发现一切符合预期,然后就开源了。

项目链接是WFMPMC(Wait-Free MPMC),是只有一个头文件的C++11模板类,用户可以通过模板参数指定元素类型和队列大小,因此是一个bounded队列。除了wait-free的特性外,WFMPMC还是zero-copy的,甚至可以放入共享内存来实现Linux中的IPC。

下面简单介绍下WFMPMC的用法:一次入队或出队操作需要依次调用3个接口函数,当然这些函数都是wait-free的。拿writer来说(reader类似),用户需要调用getWriteIdx(), getWritable(idx), commitWrite(idx)这三个函数,详见项目链接。
一个具体的代码示例:

WFMPMC<int, 8> q;

// write integer 123 into queue
auto idx = q.getWriteIdx();
int* data;
while((data = q.getWritable(idx)) == nullptr)
  ;
*data = 123;
q.commitWrite(idx);

...

// read an integer from queue
auto idx = q.getReadIdx();
int* data;
while((data = q.getReadable(idx)) == nullptr)
  ;
std::cout<< *data << std::endl;
q.commitRead(idx);

用户类型T不需要默认构造函数,但是getWritable(idx)返回的指针会指向一个未构造
的对象!用户需要自己构造它,方法详见项目链接。
上面的接口调用略显冗长,这是为了实现wait-free和zero-copy这两个特性付出的代价(一个特性多一个接口)。
当然,有些时候我们不太需要wait-free和zero-copy,而只想简单的进行入队和出队操作。幸运的是,WFMPMC也提供了懒人版本接口:emplace()pop()

WFMPMC<int, 8> q;

// write integer 123 into queue
q.emplace(123)

...

// read an integer from queue
std::cout << q.pop() << std::endl;

经评论反馈,发现大家还需要简单易用并且wait-free的Try版本API,于是我增加了tryEmplace()tryPop()接口:

WFMPMC<int, 8> q;

// write integer 123 into queue
while(!q.tryEmplace(123))
  ;

...

// read an integer from queue
int data;
while(!q.tryPop(data))
  ;
std::cout << data << std::endl;

tryEmplace()tryPop()不是zero-copy的,但却是wait-free或者wait-forever的,这取决于第三个模板参数THR_SIZE是否够大。 因为Try API使用了对象级别的thread local变量保存index,其中使用了open addressing hash table查询thread id,而THR_SIZE是hash table的大小。

关于性能,我做了下测试发现还不错(详见Performance):
单线程测试:一对入队出队操作耗时53 cycle
共享内存IPC:进程间通信延时在200 cycle左右。

最后,虽然WFMPMC可基于共享内存,但它不是崩溃安全的(什么是崩溃安全?)。对此,有一个解决方案可以减轻这种风险。

 类似资料: