当前位置: 首页 > 教程 > 进程通信 >

消息队列

精华
小牛编辑
172浏览
2023-03-14

为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 -

  • 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。
  • 如果想使用小信息格式进行通信。
  • 当多个进程同时进行通信时,共享内存数据需要同步保护。
  • 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。
  • 如果所有的进程不需要访问共享内存,但是很少的进程只需要它,那么用消息队列实现会更好。
  • 如果想要与不同的数据包进行通信,比如进程A正在发送消息类型1给进程B,消息类型10给进程C,消息类型20给进程D。在这种情况下,用消息队列实现是比较简单的。 为了将给定的消息类型简化为1,10,20,它可以是0+ve-ve,如下所述。
  • 当然,消息队列的顺序是FIFO(先进先出)。 插入到队列中的第一条消息是第一条要检索的消息。

使用共享内存或消息队列取决于应用程序的需要以及如何有效地使用它。

使用消息队列的通信可以通过以下方式进行 -

  • 通过一个进程写入共享内存,并由另一个进程读取共享内存。 正如我们所知道的,读取也可以用多个进程来完成。
  • 通过一个进程用不同的数据包写入共享存储器,并通过多个进程,即按照消息类型读出。

    看到消息队列中的某些信息后,现在是检查支持消息队列的系统调用(System V)的时候了。

要使用消息队列执行通信,请执行以下步骤 -

第1步 - 创建一个消息队列或连接到一个已经存在的消息队列(msgget())
第2步 - 写入消息队列(msgsnd())
第3步 - 从消息队列中读取(msgrcv())
第4步 - 对消息队列(msgctl())执行控制操作

现在,让我们看看上述调用的语法和某些信息。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

这个系统调用创建或分配一个System V消息队列。需要传递以下参数 -

  • 第一个参数key用于识别消息队列。key可以是任意值,也可以是来自库函数ftok()的值。
  • 第二个参数shmflg指定所需的消息队列标志,例如IPC_CREAT(如果不存在则创建消息队列)或IPC_EXCL(与IPC_CREAT一起用于创建消息队列,如果消息队列已经存在,则调用失败)。 还需要传递权限。

注 - 有关权限的详细信息,请参阅前面几节。

这个调用会在成功时返回一个有效的消息队列标识符(用于进一步调用消息队列),在失败的情况下返回-1。 要知道失败的原因,请检查errno变量或perror()函数。

关于这个调用的各种错误是EACCESS(权限被拒绝),EEXIST(队列已经存在不能创建),ENOENT(队列不存在),ENOMEM(没有足够的内存来创建队列)等

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系统调用将消息发送/附加到消息队列(System V)中。 需要传递以下参数 -

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时收到标识符的值
  • 第二个参数msgp是发送给调用者的消息的指针,定义在以下形式的结构中 -

    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    

    变量mtype用于与不同的消息类型进行通信,在msgrcv()调用中详细解释。 变量mtext是一个数组或其他大小由msgsz(正值)指定的结构。 如果没有提到mtext字段,则将其视为0大小消息,这是允许的。

  • 第三个参数msgsz是消息的大小(消息应该以空字符结尾)

  • 第四个参数msgflg表示某些标志,例如IPC_NOWAIT(当在队列中找不到消息时立即返回)或MSG_NOERROR(截断消息文本,如果超过msgsz字节)

这个调用在成功时将返回0,在失败的情况下为-1。 要知道失败的原因,请检查errno变量或perror()函数。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

该系统调用从消息队列(系统V)中检索消息。 需要传递以下参数 -

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时收到标识符的值
  • 第二个参数msgp是从调用者接收的消息的指针。 它在以下形式的结构中被定义 -
    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    
    变量mtype用于与不同的消息类型进行通信。 变量mtext是一个数组或其他大小由msgsz(正值)指定的结构。 如果没有提到mtext字段,则将其视为0大小消息,这是允许的。
  • 第三个参数msgsz是收到的消息的大小(消息应该以空字符结尾)
  • 第四个参数msgtype表示消息的类型 -
    • 如果msgtype0 - 读取队列中的第一个收到的消息。
    • 如果msgtype+ve - 读取类型为msgtype的队列中的第一条消息(如果msgtype10,则只读取类型10的第一条消息,即使其他类型可能位于队列中的开头)
    • 如果msgtype-ve - 读取小于或等于消息类型的绝对值的最小类型的第一个消息(例如,如果msgtype-5,则它读取类型小于5的第一个消息,即消息类型从15)
  • 第五个参数msgflg表示某些标志,例如IPC_NOWAIT(当队列中没有消息时立即返回,或MSG_NOERROR(如果超过了msgsz字节则截断消息文本)

这个调用将返回成功时在mtext数组中实际接收的字节数,在失败的情况下返回-1。 要知道失败的原因,请检查errno变量或perror()函数。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

这个系统调用执行消息队列(系统V)的控制操作。 需要传递以下参数 -

  • 第一个参数msgid识别消息队列,即消息队列标识符。 msgget()成功时收到标识符的值
  • 第二个参数cmd是对消息队列执行所需控制操作的命令。 cmd的有效值是 -
    • IPC_STAT - 将struct msqid_ds的每个成员的当前值的信息复制到由buf指向的传递结构中。 该命令需要消息队列的读取权限。
    • IPC_SET - 设置结构buf指向的用户ID,所有者的组ID,权限等。
    • IPC_RMID - 立即删除消息队列。
    • IPC_INFO - 返回有关buf指向的结构中的消息队列限制和参数的信息,该结构的类型为struct msginfo
    • MSG_INFO - 返回一个msginfo结构,其中包含有关消息队列消耗的系统资源的信息。
  • 第三个参数buf是一个指向名为struct msqid_ds的消息队列结构的指针。 这个结构的值将被用于任一集或者按照cmd得到。

这个调用将根据传递的命令返回值。 IPC_INFO和MSG_INFO或MSG_STAT的成功返回消息队列的索引或标识符,其他操作返回0,失败时返回-1。 要知道失败的原因,请检查errno变量或perror()函数。

上面已经看到有关消息队列的基本信息和系统调用,现在是时候来看看程序代码了。

让我们看看这个程序实现的描述 -

第1步 - 创建两个进程,一个用于发送到消息队列(msgq_send.c),另一个用于从消息队列(msgq_recv.c)
第2步 - 使用ftok()函数创建键(Key)。 为此,最初创建文件msgq.txt以获取唯一的键。
第3步 - 发送过程执行以下操作。

  • 读取用户输入的字符串
  • 删除新行,如果存在
  • 发送到消息队列
  • 重复这个过程直到输入结束(CTRL + D)
  • 一旦收到输入结束,发送消息“end”来表示进程结束。

第4步 - 在接收过程中,执行以下操作。

  • 从队列中读取消息
  • 显示输出
  • 如果收到的消息是“end”,则结束该过程并退出。

为了简化,我们没有使用这个示例的消息类型。 另外,一个进程正在写入队列,另一个进程正在从队列中读取。 这可以根据需要进行扩展,即理想情况下一个进程将写入队列中,多个进程从队列中读取。

现在,让我们看看一下进程(消息发送到队列) - 文件:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */

   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");

   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

执行上面示例代码,得到以下输出结果 -

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是来自消息接收过程的代码(从队列中检索消息) - 文件:msgq_recv.c -

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");

   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

执行上面示例代码,得到以下输出结果 -

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.