这里是POSIX IPC,注意与System V相关的IPC不一样,主要区别在于,所有POSIX IPC都是线程安全的,而大多数SysV IPC不是。.POSIX是为了使基于UNIX的系统的接口标准化而创建的。
(1)mq_open函数
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
name 某个消息队列的名字
oflag:与open函数类似,可以是O_RDONLY,O_WRONLY, O_RDWR, 还可以按位或上O_CREAT, O_EXCL, O_NONBLOCK等。
mode:如果oflag指定了O_CREAT,需要设置mode
成功返回消息队列文件描述符;失败返回-1
1)封装一个创建消息队列的函数
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
mqd_t mq_open_safe(char *name, int oflag)
{
mqd_t mqid;
mqid = mq_open(name, oflag/*O_CREAT | O_RDWR*/, 0666, NULL);
if(mqid == (mqd_t)-1)
{
perror("mq_open error");
exit(EXIT_FAILURE);
}
return mqid;
}
2)创建完之后,并不能像System V消息队列那样使用ipcs命令查看。那么该怎么查看呢?查看MAN手册:man 7 mq_overview
3)也就是说,创建完之后,消息队列其实已经存在了,不过是存在一个虚拟的文件系统中,但是这个文件系统需要挂载到某一个目录下面才可以,man手册中提示,可以创建挂载点到/dev/目录下:
$ mkdir /dev/mqueue
$ mount -t mqueue none /dev/mqeue
挂载之后,进入mqueue目录,即可看到创建的消息队列,并且可以使用cat命令查看该消息队列当前的状态:
4)POSIX IPC名字限定
必须以/开头,并且后续不能有其他/,形如/abc;长度不能超过NAME_MAX
功能:关闭消息队列
原型:mqd_t mq_close(mqd_t mqdes);
参数:
mqdes:消息队列描述符
返回值:成功返回0;失败返回-1
例如,上一个示例,在程序返回前,调用mq_close(mqid);
功能:删除消息队列
原型:mqd_t mq_unlink(const char *name);
参数:
name:消息队列名字,而不是消息队列描述符mqdes
返回值:成功返回0;失败返回-1
1)更准确的含义是删除一个连接数,直到连接数减为0的时候,才真正将文件删除。使用如下命令可以查看文件的连接数:
可以通过ls -l 对应文件名 查看文件的连接数
功能:获取/设置消息队列属性
原型:
mqd_t mq_getattr(mqd_t mqdes,struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
返回值:成功返回0;失败返回-1
其中结构体mq_attr如下:
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};
1)封装获取消息队列属性:
mqd_t mq_getattr_safe(mqd_t mqdes,struct mq_attr *attr)
{
mqd_t mq_getattr;
mq_getattr = mq_getattr(mqid, &attr);
if(mq_getattr == -1)
{
perror("mq_getattr error");
exit(EXIT_FAILURE);
}
return mq_getattr ;
}
功能:发送消息
原型:
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
参数:
mqdes:消息队列描述符
msg_ptr:指向消息的指针
msg_len:消息长度
msg_prio:消息优先级 它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
返回值:成功返回0;失败返回-1
封装该函数,并尝试向消息队列中发送一条消息:
typedef struct stu
{
char name[32];
int age;
}STU;
mqd_t mq_send_safe(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned msg_prio)
{
mqd_t check;
check = mq_send(mqdes, (const char*)msg_ptr, msg_len, msg_prio);
if(check = 1)
{
perror("mq_getattr error");
exit(EXIT_FAILURE);
}
return check;
}
int main(int argc, char *argv[])
{
if (argc != 2){
fprintf(stderr, "Usage: %s <prior>\n", argv[0]);
}
mqd_t mqid;
mqid = mq_open("/abc", O_WRONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");
STU stu;
strcpy(stu.name, "Leo");
stu.age = 25;
unsigned prio = atoi(argv[1]);
mq_send_safe(mqid, (const char*)&stu, sizeof(stu), prio);
mq_close(mqid);
return 0;
}
功能:接收消息
原型:
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
参数:
mqdes:消息队列描述符
msg_ptr:可接收的消息
msg_len:消息长度
msg_prio:返回接受到的消息优先级
返回值:成功返回接收到的消息字节数;失败返回-1
注意:返回指定消息队列中最高优先级的最早消息
使用该函数,实现对消息的接收:
typedef struct stu
{
char name[32];
int age;
}STU;
int main(int argc, char *argv[])
{
mqd_t mqid;
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");
struct mq_attr attr;
mq_getattr(mqid, &attr);
size_t size = attr.mq_msgsize;
STU stu;
unsigned prio;
if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
ERR_EXIT("mq_receive");
printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);
mq_close(mqid);
return 0;
}
功能:建立或删除消息到达通知时间
原型
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
参数:
mqdes:消息队列描述符
notification:
非空表示当消息到达且消息队列先前为空,那么将得到通知
NULL表示撤销已注册的通知
返回值:成功返回0;失败返回-1
通知方式:
产生一个信号
创建一个线程执行一个指定的函数
1)其中结构体如下:
union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* Notification method */
int sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with
notification */
void (*sigev_notify_function) (union sigval);
/* Function used for thread
notification (SIGEV_THREAD) */
void *sigev_notify_attributes;
/* Attributes for notification thread
(SIGEV_THREAD) */
pid_t sigev_notify_thread_id;
/* ID of thread to signal (SIGEV_THREAD_ID) */
};
sigev_notify有三个取值:
SIGEV_NONE:表示不会又通知;
SIGEV_SIGNAL:表示以信号的方式来通知;需要指定sigev_signo和sigev_value
SIGEV_THREAD:表示以线程的方式来通知。需要指定结构体中最后两个参数
2)使用该函数,以信号的方式来通知
a)为简单起见,使用SIGUSR1信号
signal(SIGUSR1, handle_sigusr1);
b)注册一个消息到达的通知:
struct sigevent sigev;
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqid, &sigev);
3)实现如下
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <string.h>
#include <signal.h>
#define ERR_EXIT(m)\
do \
{ \
perror(m);\
exit(EXIT_FAILURE);\
}while(0)
typedef struct stu
{
char name[32];
int age;
}STU;
size_t size;
mqd_t mqid;
struct sigevent sigev;
void handle_sigusr1(int sig)
{
mq_notify(mqid, &sigev);
STU stu;
unsigned int prio;
if (mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t)-1)
ERR_EXIT("mq_receive");
printf("name=%s age=%d prio=%u\n", stu.name, stu.age, prio);
}
int main(int argc, char *argv[])
{
mqid = mq_open("/abc", O_RDONLY);
if (mqid == (mqd_t)-1)
ERR_EXIT("mq_open");
struct mq_attr attr;
mq_getattr(mqid, &attr);
size = attr.mq_msgsize;
signal(SIGUSR1, handle_sigusr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqid, &sigev);
for(;;)
pause();
mq_close(mqid);
return 0;
}
在消息队列中没有消息的情况下,运行该程序,将阻塞。而当另一个程序通过向消息队列中发送消息之后,notify程序将会得到通知,并将结果打印输出:
4)mq_notify注意
任何时刻只能有一个进程可以被注册为接收某个给定队列的通知
当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出。
当通知被发送给它 的注册进程时,其注册被撤销。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在从消息队列读出消息之前而不是之后。