Posix message queue 消息队列

赫连骏
2023-12-01
Posix message queue

分类: LINUX

因为要用,学了点IPC,在网上找了个程序,改了一下,加入了pthread,代码如下:

/*
* mqueue.c
*
* Test posix message queue.
*/

#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <time.h>
#include <sched.h>
#include <sys/mman.h>
#include <sys/fcntl.h>
#include <signal.h>
#include <mqueue.h>
#include <errno.h>
#include <stdlib.h>
#include <pthread.h>

#define PMODE 0666
extern int errno;

void *mqueue_send(void *arg);
void *mqueue_recv(void *arg);

int main()
{
pthread_t t1, t2;

if (0 != pthread_create(&t1, NULL, mqueue_send, NULL)) {
printf("Cannot create thread.\n");
return -1;
}
if (0 != pthread_create(&t2, NULL, mqueue_recv, NULL)) {
printf("Cannot create thread t2.\n");
return -1;
}

pthread_join(t1, NULL);
pthread_join(t2, NULL);

return 0;
}

void *mqueue_send(void *arg)
{
int i;
int status = 0;
mqd_t mqfd;
char msg_buffer[1024];
struct mq_attr attr;
int open_flags = 0;
int num_bytes_to_send;
int priority_of_msg;

printf("START OF TEST_SEND \n");

/* Fill in attributes for message queue */
attr.mq_maxmsg = 20;
attr.mq_msgsize = 1024;
attr.mq_flags = 0;

/* Set the flags for the open of the queue.
* Make it a blocking open on the queue, meaning it will block if
* this process tries to send to the queue and the queue is full.
* (Absence of O_NONBLOCK flag implies that the open is blocking)
*
* Specify O_CREAT so that the file will get created if it does not
* already exist.
*
* Specify O_WRONLY since we are only planning to write to the queue,
* although we could specify O_RDWR also.
*/

open_flags = O_WRONLY|O_CREAT;

/* Open the queue, and create it if the receiving process hasn't
* already created it.
*/

mqfd = mq_open("/myipc",open_flags,PMODE,&attr);
if (mqfd == -1)
{
perror("mq_open failure from main");
exit(0);
};

/* Fill in a test message buffer to send */
msg_buffer[0] = 'P';
msg_buffer[1] = 'R';
msg_buffer[2] = 'I';
msg_buffer[3] = 'O';
msg_buffer[4] = 'R';
msg_buffer[5] = 'I';
msg_buffer[6] = 'T';
msg_buffer[7] = 'Y';
msg_buffer[8] = '1';
msg_buffer[9] = 'a';

num_bytes_to_send = 10;
priority_of_msg = 1;

/* Perform the send 10 times */
for (i=0; i<10; i++)
{
status = mq_send(mqfd,msg_buffer,num_bytes_to_send,priority_of_msg);
if (status == -1)
perror("mq_send failure on mqfd");
else
printf("successful call to mq_send, i = %d\n",i);
sleep(1);
}

/* Done with queue, so close it */
if (mq_close(mqfd) == -1)
perror("mq_close failure on mqfd");

printf("About to exit the sending process after closing the queue \n");
}

void *mqueue_recv(void *arg)
{
int i;
mqd_t mqfd;
/* Buffer to receive msg into */
char msg_buffer[1024];
struct mq_attr attr;
int open_flags = 0;
ssize_t num_bytes_received = 0;
msg_buffer[10] = 0; /* For printing a null terminated string for testing */

printf("START OF TEST_RECEIVE \n");

/* Fill in attributes for message queue */
attr.mq_maxmsg = 20;
attr.mq_msgsize = 1024;
attr.mq_flags = 0;

/* Set the flags for the open of the queue.
* Make it a blocking open on the queue,
* meaning it will block if this process tries to
* send to the queue and the queue is full.
* (Absence of O_NONBLOCK flag implies that
* the open is blocking)
*
* Specify O_CREAT so that the file will get
* created if it does not already exist.
*
* Specify O_RDONLY since we are only
* planning to write to the queue,
* although we could specify O_RDWR also.
*/

open_flags = O_RDONLY|O_CREAT;

/* Open the queue, and create it if the sending process hasn't
* already created it.
*/

mqfd = mq_open("/myipc",open_flags,PMODE,&attr);
if (mqfd == -1)
{
perror("mq_open failure from main");
exit(0);
};

/* Perform the receive 10 times */
for (i=0;i<10;i++)
{
num_bytes_received = mq_receive(mqfd,msg_buffer,1024,0);
if (num_bytes_received == -1)
{
perror("mq_receive failure on mqfd");
}
else
printf("data read for iteration %d = %s \n",i,msg_buffer);
}

/* Done with queue, so close it */
if (mq_close(mqfd) == -1)
perror("mq_close failure on mqfd");

/* Done with test, so unlink the queue,
* which destroys it.
* You only need one call to unlink.
*/

if (mq_unlink("/myipc") == -1)
perror("mq_unlink failure in test_ipc");

printf("Exiting receiving process after closing and unlinking queue \n");
}

/* Local Variables: */
/* compile-command: "gcc -o mqueue mqueue.c -lrt" */
/* End: */


运行结果:
START OF TEST_SEND
successful call to mq_send, i = 0
START OF TEST_RECEIVE
data read for iteration 0 = PRIORITY1a
successful call to mq_send, i = 1
data read for iteration 1 = PRIORITY1a
successful call to mq_send, i = 2
data read for iteration 2 = PRIORITY1a
successful call to mq_send, i = 3
data read for iteration 3 = PRIORITY1a
successful call to mq_send, i = 4
data read for iteration 4 = PRIORITY1a
successful call to mq_send, i = 5
data read for iteration 5 = PRIORITY1a
successful call to mq_send, i = 6
data read for iteration 6 = PRIORITY1a
successful call to mq_send, i = 7
data read for iteration 7 = PRIORITY1a
successful call to mq_send, i = 8
data read for iteration 8 = PRIORITY1a
successful call to mq_send, i = 9
data read for iteration 9 = PRIORITY1a
Exiting receiving process after closing and unlinking queue
About to exit the sending process after closing the queue
 类似资料: