Linux进程通信IPC--消息队列MessageQueue

督阿苏
2023-12-01

前言: Linux 进程通信系列文章是对工作中使用的技术进行描述总结,使用了Boost 库IPC 实现,因此文章的代码部分选自Boost example 文档。

消息队列

消息队列(MessageQueue, 以下简称MQ)为同一台计算机进程间通信(IPC)提供了一种方式,提供了从一个进程向另外一个进程发送数据块的方法。

MQ被创建后,不属于任何进程,独立于进程存在,因此MQ必须具有唯一的名称,进程使用名称访问它,此外MQ是持久的存在的,即使创建的进程死亡后仍然留在内存中,需要通过显示调用remove 删除。MQ被创建后,每个进程都可以向消息队列中发送和从消息队列中获取消息,读取过程不可避免产生竞争,也必然需要同步,不过该同步过程由消息队列本身完成,用户不需要关心。

线程通过三种方式向MQ中发送/从MQ中获取消息:
1) 阻塞式:发送消息时,若MQ已满,则发送方将会阻塞等待直到MQ消息取出后有空间, 投递成功后返回;获取消息时,若MQ为空,则获取方阻塞等待直到MQ有新的消息,获取消息成功后返回。

2) Timed 阻塞式:发送方和接收方都自定义超时时间,在该超时时间内阻塞式等待, 若超时时间到了,则发送方无论是否发送成功,接收方无论是否获取成功都会返回,避免一直阻塞等待。

3) Try发送/获取式:发送方尝试发送消息至消息队列,若队列不满,则投递成功返回,否则投递失败但也立即返回;接收方尝试从消息队列中获取消息,若消息队列不为空则获取成功返回,否则获取失败立即返回。该方式特点是:无论MQ是否满或者空,都能立即返回。

消息队列实现

消息队列模型并不复杂,Boost IPC 编程提供了跨平台的消息队列实现,简单方便。

  • 创建/打开消息队列:
message_queue_t(create_only_t create_only,  //仅创建
                 const char *name,          //消息队列名称,进程通过名称访问该消息队列
                 size_type max_num_msg,     //max 消息数量
                 size_type max_msg_size,    //max 消息占用空间
                 const permissions &perm = permissions());

message_queue_t(open_only_t open_only,      //仅打开
                 const char *name);         //消息队列名称,进程通过名称访问该消息队列

message_queue_t(open_or_create_t open_or_create, //如果不存在则创建,存在则打开
                 const char *name,
                 size_type max_num_msg,
                 size_type max_msg_size,
                 const permissions &perm = permissions());
  • 发送/接收消息:
void send (const void *buffer,      //消息内容
                 size_type buffer_size,   //消息内容size
                 unsigned int priority);  //优先级

bool try_send(const void *buffer,     
                       size_type buffer_size,
                       unsigned int priority);

bool timed_send(const void *buffer,     
                           size_type buffer_size,
                           unsigned int priority,  
                           const boost::posix_time::ptime& abs_time);  //超时时间

 void receive(void *buffer,                 //用于存储获取消息的buffer
                      size_type buffer_size,   //用于存储获取消息buffer的size
                      size_type &recvd_size,   //实际获取的size
                      unsigned int &priority);

bool try_receive(void *buffer,           
                           size_type buffer_size,
                           size_type &recvd_size,
                           unsigned int &priority);

bool timed_receive(void *buffer,           
                                size_type buffer_size,
                                size_type &recvd_size,
                                unsigned int &priority,
                                const boost::posix_time::ptime &abs_time);  //超时时间
  • 删除消息队列:
static bool remove(const char *name);   //这是静态函数,消息队列不依赖于进程

消息队列举例

使用example,选自boost 官方文档:

  • 创建发送方:
using namespace boost::interprocess;

int main ()
{
    try{
        //Erase previous message queue
        message_queue::remove("message_queue");

        //Create a message_queue.
        message_queue mq(create_only,               //only create
             "message_queue",           //name
                            100,                       //max message number
                            sizeof(int));              //max message size

        //Send 100 numbers
        for(int i = 0; i < 100; ++i){
            mq.send(&i, sizeof(i), 0);
        }
    }
    catch(interprocess_exception &ex){
        std::cout << ex.what() << std::endl;
        return 1;
    }

    return 0;
}
  • 接收方:
using namespace boost::interprocess;

int main ()
{
    try{
        //Open a message queue.
        message_queue mq(open_only,         //only create
                        "message_queue");  //name

        unsigned int priority;
        message_queue::size_type recvd_size;

        //Receive 100 numbers
        for(int i = 0; i < 100; ++i){
          int number;
          mq.receive(&number, sizeof(number), recvd_size, priority);
          if(number != i || recvd_size != sizeof(number))
             return 1;
        }
    }
    catch(interprocess_exception &ex){
        message_queue::remove("message_queue");
        std::cout << ex.what() << std::endl;
        return 1;
    }

    message_queue::remove("message_queue");  //删除MQ
    return 0;
}

需要注意:
MQ只是简单的将要发送的数据在内存中进行拷贝,所以我们在发送复杂结构或对象时,我们需要将其序列化后再发送,接收端接收时要反序列化,也就是说我们要自己去定义区分一条消息(定义通迅协议)。

A message queue just copies raw bytes between processes and does not send objects. This means that if we want to send an object using a message queue the object must be binary serializable. For example, we can send integers between processes but not a std::string. You should use Boost.Serialization or use advanced Boost.Interprocess mechanisms to send complex data between processes.

 类似资料: