前言: Linux 进程通信系列文章是对工作中使用的技术进行描述总结,使用了Boost 库IPC 实现,因此文章的代码部分选自Boost example 文档。
消息队列(MessageQueue, 以下简称MQ)为同一台计算机进程间通信(IPC)提供了一种方式,提供了从一个进程向另外一个进程发送数据块的方法。
MQ被创建后,不属于任何进程,独立于进程存在,因此MQ必须具有唯一的名称,进程使用名称访问它,此外MQ是持久的存在的,即使创建的进程死亡后仍然留在内存中,需要通过显示调用remove 删除。MQ被创建后,每个进程都可以向消息队列中发送和从消息队列中获取消息,读取过程不可避免产生竞争,也必然需要同步,不过该同步过程由消息队列本身完成,用户不需要关心。
线程通过三种方式向MQ中发送/从MQ中获取消息:
1) 阻塞式:发送消息时,若MQ已满,则发送方将会阻塞等待直到MQ消息取出后有空间, 投递成功后返回;获取消息时,若MQ为空,则获取方阻塞等待直到MQ有新的消息,获取消息成功后返回。
2) Timed 阻塞式:发送方和接收方都自定义超时时间,在该超时时间内阻塞式等待, 若超时时间到了,则发送方无论是否发送成功,接收方无论是否获取成功都会返回,避免一直阻塞等待。
3) Try发送/获取式:发送方尝试发送消息至消息队列,若队列不满,则投递成功返回,否则投递失败但也立即返回;接收方尝试从消息队列中获取消息,若消息队列不为空则获取成功返回,否则获取失败立即返回。该方式特点是:无论MQ是否满或者空,都能立即返回。
消息队列模型并不复杂,Boost IPC 编程提供了跨平台的消息队列实现,简单方便。
message_queue_t(create_only_t create_only, //仅创建
const char *name, //消息队列名称,进程通过名称访问该消息队列
size_type max_num_msg, //max 消息数量
size_type max_msg_size, //max 消息占用空间
const permissions &perm = permissions());
message_queue_t(open_only_t open_only, //仅打开
const char *name); //消息队列名称,进程通过名称访问该消息队列
message_queue_t(open_or_create_t open_or_create, //如果不存在则创建,存在则打开
const char *name,
size_type max_num_msg,
size_type max_msg_size,
const permissions &perm = permissions());
void send (const void *buffer, //消息内容
size_type buffer_size, //消息内容size
unsigned int priority); //优先级
bool try_send(const void *buffer,
size_type buffer_size,
unsigned int priority);
bool timed_send(const void *buffer,
size_type buffer_size,
unsigned int priority,
const boost::posix_time::ptime& abs_time); //超时时间
void receive(void *buffer, //用于存储获取消息的buffer
size_type buffer_size, //用于存储获取消息buffer的size
size_type &recvd_size, //实际获取的size
unsigned int &priority);
bool try_receive(void *buffer,
size_type buffer_size,
size_type &recvd_size,
unsigned int &priority);
bool timed_receive(void *buffer,
size_type buffer_size,
size_type &recvd_size,
unsigned int &priority,
const boost::posix_time::ptime &abs_time); //超时时间
static bool remove(const char *name); //这是静态函数,消息队列不依赖于进程
使用example,选自boost 官方文档:
using namespace boost::interprocess;
int main ()
{
try{
//Erase previous message queue
message_queue::remove("message_queue");
//Create a message_queue.
message_queue mq(create_only, //only create
"message_queue", //name
100, //max message number
sizeof(int)); //max message size
//Send 100 numbers
for(int i = 0; i < 100; ++i){
mq.send(&i, sizeof(i), 0);
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
using namespace boost::interprocess;
int main ()
{
try{
//Open a message queue.
message_queue mq(open_only, //only create
"message_queue"); //name
unsigned int priority;
message_queue::size_type recvd_size;
//Receive 100 numbers
for(int i = 0; i < 100; ++i){
int number;
mq.receive(&number, sizeof(number), recvd_size, priority);
if(number != i || recvd_size != sizeof(number))
return 1;
}
}
catch(interprocess_exception &ex){
message_queue::remove("message_queue");
std::cout << ex.what() << std::endl;
return 1;
}
message_queue::remove("message_queue"); //删除MQ
return 0;
}
需要注意:
MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(定义通迅协议)。
A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.