当前位置: 首页 > 工具软件 > Open-MQ > 使用案例 >

进程间通信之POSIX消息队列-mq_open,mq_send, mq_receive

锺离锦
2023-12-01

POSIX IPC

这里是POSIX IPC,注意与System V相关的IPC不一样,主要区别在于,所有POSIX IPC都是线程安全的,而大多数SysV IPC不是。.POSIX是为了使基于UNIX的系统的接口标准化而创建的。

POSIX消息队列相关函数

(1)mq_open函数

功能:用来创建和访问一个消息队列
原型:

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

参数:

name 某个消息队列的名字
oflag:与open函数类似,可以是O_RDONLY,O_WRONLY, O_RDWR, 还可以按位或上O_CREAT, O_EXCL, O_NONBLOCK等。
mode:如果oflag指定了O_CREAT,需要设置mode

返回值:

成功返回消息队列文件描述符;失败返回-1

mq_open函数

1)封装一个创建消息队列的函数

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
 
 mqd_t mq_open_safe(char *name, int oflag)
 {
        mqd_t mqid;
        mqid = mq_open(name, oflag/*O_CREAT | O_RDWR*/, 0666, NULL);
        if(mqid == (mqd_t)-1)
        {
              perror("mq_open error");
              exit(EXIT_FAILURE);
        }
        return mqid;
 }

2)创建完之后,并不能像System V消息队列那样使用ipcs命令查看。那么该怎么查看呢?查看MAN手册:man 7 mq_overview

3)也就是说,创建完之后,消息队列其实已经存在了,不过是存在一个虚拟的文件系统中,但是这个文件系统需要挂载到某一个目录下面才可以,man手册中提示,可以创建挂载点到/dev/目录下:
$ mkdir /dev/mqueue
$ mount -t mqueue none /dev/mqeue

挂载之后,进入mqueue目录,即可看到创建的消息队列,并且可以使用cat命令查看该消息队列当前的状态:

4)POSIX IPC名字限定
必须以/开头,并且后续不能有其他/,形如/abc;长度不能超过NAME_MAX

mq_close函数

功能:关闭消息队列
原型:mqd_t mq_close(mqd_t mqdes);
参数:
mqdes:消息队列描述符
返回值:成功返回0;失败返回-1
例如,上一个示例,在程序返回前,调用mq_close(mqid);

mq_unlink函数

功能:删除消息队列
原型:mqd_t mq_unlink(const char *name);
参数:
name:消息队列名字,而不是消息队列描述符mqdes
返回值:成功返回0;失败返回-1

1)更准确的含义是删除一个连接数,直到连接数减为0的时候,才真正将文件删除。使用如下命令可以查看文件的连接数:

可以通过ls -l 对应文件名 查看文件的连接数

mq_getattr/mq_setattr两个函数

功能:获取/设置消息队列属性
原型:
mqd_t mq_getattr(mqd_t mqdes,struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
返回值:成功返回0;失败返回-1

其中结构体mq_attr如下:

struct mq_attr {
               long mq_flags;       /* Flags: 0 or O_NONBLOCK */
               long mq_maxmsg;      /* Max. # of messages on queue */
               long mq_msgsize;     /* Max. message size (bytes) */
               long mq_curmsgs;     /* # of messages currently in queue */
           };

1)封装获取消息队列属性:

mqd_t mq_getattr_safe(mqd_t mqdes,struct mq_attr *attr)
{
     mqd_t mq_getattr;
     mq_getattr  = mq_getattr(mqid, &attr);
     if(mq_getattr == -1)
      {
              perror("mq_getattr error");
              exit(EXIT_FAILURE);
      }
      return mq_getattr ;
}

mq_send函数

功能:发送消息
原型:
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
参数:
mqdes:消息队列描述符
msg_ptr:指向消息的指针
msg_len:消息长度
msg_prio:消息优先级 它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
返回值:成功返回0;失败返回-1

封装该函数,并尝试向消息队列中发送一条消息:

typedef struct stu
{
        char name[32];
        int age;
}STU;
 
mqd_t mq_send_safe(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned msg_prio)
{
     mqd_t check;
     check = mq_send(mqdes, (const char*)msg_ptr, msg_len, msg_prio);
      if(check = 1)
      {
            perror("mq_getattr error");
             exit(EXIT_FAILURE);
      }
      return check;
}

int main(int argc, char *argv[])
{
        if (argc != 2){
                fprintf(stderr, "Usage: %s <prior>\n", argv[0]);
        }
 
        mqd_t mqid;
        mqid = mq_open("/abc", O_WRONLY);
        if (mqid == (mqd_t)-1)
                ERR_EXIT("mq_open");
 
        STU stu;
        strcpy(stu.name, "Leo");
        stu.age = 25;
 
        unsigned prio = atoi(argv[1]);
        mq_send_safe(mqid, (const char*)&stu, sizeof(stu), prio);
        mq_close(mqid);
 
        return 0;
}
mq_receive函数

功能:接收消息
原型:
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
参数:
mqdes:消息队列描述符
msg_ptr:可接收的消息
msg_len:消息长度
msg_prio:返回接受到的消息优先级
返回值:成功返回接收到的消息字节数;失败返回-1
注意:返回指定消息队列中最高优先级的最早消息

使用该函数,实现对消息的接收:

typedef struct stu
{
        char name[32];
        int age;
}STU;
 
int main(int argc, char *argv[])
{
        mqd_t mqid;
        mqid = mq_open("/abc", O_RDONLY);
        if (mqid == (mqd_t)-1)
                ERR_EXIT("mq_open");
 
        struct mq_attr attr;
        mq_getattr(mqid, &attr);
        size_t size = attr.mq_msgsize;
        STU stu;
 
        unsigned prio;
        if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
                ERR_EXIT("mq_receive");
 
        printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);
        mq_close(mqid);
        return 0;
 }
mq_notify函数

功能:建立或删除消息到达通知时间
原型
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
参数:
mqdes:消息队列描述符
notification:
非空表示当消息到达且消息队列先前为空,那么将得到通知
NULL表示撤销已注册的通知
返回值:成功返回0;失败返回-1
通知方式:
产生一个信号
创建一个线程执行一个指定的函数

1)其中结构体如下:

 union sigval {          /* Data passed with notification */
           int     sival_int;         /* Integer value */
           void   *sival_ptr;         /* Pointer value */
       };
 
       struct sigevent {
           int          sigev_notify; /* Notification method */
           int          sigev_signo;  /* Notification signal */
           union sigval sigev_value;  /* Data passed with
                                         notification */
           void       (*sigev_notify_function) (union sigval);
                            /* Function used for thread
                               notification (SIGEV_THREAD) */
           void        *sigev_notify_attributes;
                            /* Attributes for notification thread
                               (SIGEV_THREAD) */
           pid_t        sigev_notify_thread_id;
                            /* ID of thread to signal (SIGEV_THREAD_ID) */
       };

sigev_notify有三个取值:
SIGEV_NONE:表示不会又通知;
SIGEV_SIGNAL:表示以信号的方式来通知;需要指定sigev_signo和sigev_value
SIGEV_THREAD:表示以线程的方式来通知。需要指定结构体中最后两个参数

2)使用该函数,以信号的方式来通知
a)为简单起见,使用SIGUSR1信号
signal(SIGUSR1, handle_sigusr1);
b)注册一个消息到达的通知:
struct sigevent sigev;
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqid, &sigev);

3)实现如下

#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
 
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <string.h>
#include <signal.h>
 
#define ERR_EXIT(m)\
        do \
        { \
                perror(m);\
                exit(EXIT_FAILURE);\
        }while(0)
 
typedef struct stu
{
        char name[32];
        int age;
}STU;
size_t size;
 
mqd_t mqid;
 
struct sigevent sigev;
 
void handle_sigusr1(int sig)
{
        mq_notify(mqid, &sigev);
        STU stu;
        unsigned int prio;
        if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
                ERR_EXIT("mq_receive");
 
        printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);
 
}
 
int main(int argc, char *argv[])
{
        mqid = mq_open("/abc", O_RDONLY);
        if (mqid == (mqd_t)-1)
                ERR_EXIT("mq_open");
        struct mq_attr attr;
        mq_getattr(mqid, &attr);
        size = attr.mq_msgsize;
 
        signal(SIGUSR1, handle_sigusr1);
 
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
 
        mq_notify(mqid, &sigev);
 
        for(;;)
                pause();
 
        mq_close(mqid);
        return 0;
}

在消息队列中没有消息的情况下,运行该程序,将阻塞。而当另一个程序通过向消息队列中发送消息之后,notify程序将会得到通知,并将结果打印输出:

4)mq_notify注意

任何时刻只能有一个进程可以被注册为接收某个给定队列的通知
当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出。
当通知被发送给它 的注册进程时,其注册被撤销。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在从消息队列读出消息之前而不是之后。

 类似资料: