Message Queue(后文简写成MQ或消息队列)是boost库中用来封装进程间通信的一种实现,同一台机器上的进程或线程可以通过消息队列来进行通迅。消息队列中的消息由优先级、消息长度、消息数据三部分组成。这里需要注意的事,MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(就是自定义网络通迅协议)。在MQ中,我们可以使用三模式去发送和接收消息:
MQ使用命名的共享内存来实现进程间通信。共享内存换句话来说,就是用户可以指定一个名称来创建一块共享内存,然后像打一个文件一样去打开这块共享内存,同样别的进程也可以根据这个名称来打开这块共享内存,这样一个进程向共享内存中写,另一个进程就可以从共享内存中读。这里两个进程的读写就涉及到同步问题。另外,在创建一个MQ时,我们需要指定MQ的最大消息数量以及消息的最大size。
//!Sends a message stored in buffer "buffer" with size "buffer_size" in the
//!message queue with priority "priority". If the message queue is full
//!the sender is blocked. Throws interprocess_error on error.
void send (const void *buffer, size_type buffer_size,
unsigned int priority);
//!Sends a message stored in buffer "buffer" with size "buffer_size" through the
//!message queue with priority "priority". If the message queue is full
//!the sender is not blocked and returns false, otherwise returns true.
//!Throws interprocess_error on error.
bool try_send (const void *buffer, size_type buffer_size,
unsigned int priority);
//!Sends a message stored in buffer "buffer" with size "buffer_size" in the
//!message queue with priority "priority". If the message queue is full
//!the sender retries until time "abs_time" is reached. Returns true if
//!the message has been successfully sent. Returns false if timeout is reached.
//!Throws interprocess_error on error.
bool timed_send (const void *buffer, size_type buffer_size,
unsigned int priority, const boost::posix_time::ptime& abs_time);
//!Receives a message from the message queue. The message is stored in buffer
//!"buffer", which has size "buffer_size". The received message has size
//!"recvd_size" and priority "priority". If the message queue is empty
//!the receiver is blocked. Throws interprocess_error on error.
void receive (void *buffer, size_type buffer_size,
size_type &recvd_size,unsigned int &priority);
//!Receives a message from the message queue. The message is stored in buffer
//!"buffer", which has size "buffer_size". The received message has size
//!"recvd_size" and priority "priority". If the message queue is empty
//!the receiver is not blocked and returns false, otherwise returns true.
//!Throws interprocess_error on error.
bool try_receive (void *buffer, size_type buffer_size,
size_type &recvd_size,unsigned int &priority);
//!Receives a message from the message queue. The message is stored in buffer
//!"buffer", which has size "buffer_size". The received message has size
//!"recvd_size" and priority "priority". If the message queue is empty
//!the receiver retries until time "abs_time" is reached. Returns true if
//!the message has been successfully sent. Returns false if timeout is reached.
//!Throws interprocess_error on error.
bool timed_receive (void *buffer, size_type buffer_size,
size_type &recvd_size,unsigned int &priority,
const boost::posix_time::ptime &abs_time);
//Create a message_queue. If the queue
//exists throws an exception
message_queue mq
(create_only //only create
,"message_queue" //name
,100 //max message number
,100 //max message size
);
using boost::interprocess;
//Creates or opens a message_queue. If the queue
//does not exist creates it, otherwise opens it.
//Message number and size are ignored if the queue
//is opened
message_queue mq
(open_or_create //open or create
,"message_queue" //name
,100 //max message number
,100 //max message size
);
using boost::interprocess;
//Opens a message_queue. If the queue
//does not exist throws an exception.
message_queue mq
(open_only //only open
,"message_queue" //name
);
使用message_queue::remove("message_queue");来移除一个指定的消息队列。
接下来,我们看一个使用消息队列的生产者与消息者的例子。第一个进程做为生产者,第二个进程做为消费者。
生产者进程:p.cpp
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
using namespace boost::interprocess;
int main ()
{
try{
/*Erase previous message queue */
message_queue::remove("message_queue");
/*Create a message_queue. */
message_queue mq
(create_only //only create
,"message_queue" //name
,100 //max message number
,sizeof(int) //max message size
);
/*Send 100 numbers */
for(int i = 0; i < 100; ++i){
mq.send(&i, sizeof(i), 0);
}
}
catch(interprocess_exception &ex){
std::cout <<"exp: "<< ex.what() << std::endl;
return 1;
}
return 0;
}
消费者进程:c.cpp
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <vector>
using namespace boost::interprocess;
int main ()
{
try{
/*Open a message queue. */
message_queue mq
(open_only //only create
,"message_queue" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
/*Receive 100 numbers */
for(int i = 0; i < 100; ++i){
int number;
mq.receive(&number, sizeof(number), recvd_size, priority);
if(number != i || recvd_size != sizeof(number))
return 1;
}
}
catch(interprocess_exception &ex){
message_queue::remove("message_queue");
std::cout <<"exp: " <<ex.what() << std::endl;
return 1;
}
message_queue::remove("message_queue");
return 0;
}
编译:
g++ -I. -I/home1/irteam/externals/boost/include -L/home1/irteam/externals/boost/lib -g -lboost_context -lboost_date_time -lboost_thread -lboost_system -lboost_program_options -lboost_
filesystem -lrt -lpthread -o c c.cpp
g++ -I. -I/home1/irteam/externals/boost/include -L/home1/irteam/externals/boost/lib -g -lboost_context -lboost_date_time -lboost_thread -lboost_system -lboost_program_options -lboost_
filesystem -lrt -lpthread -o p p.cpp