昨天在wait-free是指什么的评论中,我和朱元兄对wait-free的MPMC(多生产者多消费者)队列进行了一番探讨,也激发了我对已有的wait-free思路进行更深入的挖掘,结果发现MPMC队列有可能做到完全wait-free。于是,本着"shut up and show me the code"的原则,我尝试实现了一个MPMC队列,经过简单的测试发现一切符合预期,然后就开源了。
项目链接是WFMPMC(Wait-Free MPMC),是只有一个头文件的C++11模板类,用户可以通过模板参数指定元素类型和队列大小,因此是一个bounded队列。除了wait-free的特性外,WFMPMC还是zero-copy的,甚至可以放入共享内存来实现Linux中的IPC。
下面简单介绍下WFMPMC的用法:一次入队或出队操作需要依次调用3个接口函数,当然这些函数都是wait-free的。拿writer来说(reader类似),用户需要调用getWriteIdx()
, getWritable(idx)
, commitWrite(idx)
这三个函数,详见项目链接。
一个具体的代码示例:
WFMPMC<int, 8> q;
// write integer 123 into queue
auto idx = q.getWriteIdx();
int* data;
while((data = q.getWritable(idx)) == nullptr)
;
*data = 123;
q.commitWrite(idx);
...
// read an integer from queue
auto idx = q.getReadIdx();
int* data;
while((data = q.getReadable(idx)) == nullptr)
;
std::cout<< *data << std::endl;
q.commitRead(idx);
用户类型T
不需要默认构造函数,但是getWritable(idx)
返回的指针会指向一个未构造
的对象!用户需要自己构造它,方法详见项目链接。
上面的接口调用略显冗长,这是为了实现wait-free和zero-copy这两个特性付出的代价(一个特性多一个接口)。
当然,有些时候我们不太需要wait-free和zero-copy,而只想简单的进行入队和出队操作。幸运的是,WFMPMC也提供了懒人版本接口:emplace()
和pop()
:
WFMPMC<int, 8> q;
// write integer 123 into queue
q.emplace(123)
...
// read an integer from queue
std::cout << q.pop() << std::endl;
经评论反馈,发现大家还需要简单易用并且wait-free的Try版本API,于是我增加了tryEmplace()
和tryPop()
接口:
WFMPMC<int, 8> q;
// write integer 123 into queue
while(!q.tryEmplace(123))
;
...
// read an integer from queue
int data;
while(!q.tryPop(data))
;
std::cout << data << std::endl;
tryEmplace()
和tryPop()
不是zero-copy的,但却是wait-free或者wait-forever的,这取决于第三个模板参数THR_SIZE
是否够大。 因为Try API使用了对象级别的thread local变量保存index,其中使用了open addressing hash table查询thread id,而THR_SIZE
是hash table的大小。
关于性能,我做了下测试发现还不错(详见Performance):
单线程测试:一对入队出队操作耗时53 cycle。
共享内存IPC:进程间通信延时在200 cycle左右。