信号量
首先想到的问题是,为什么我们需要信号量? 一个简单的答案,以保护多个进程共享的关键/共同区域。
假设多个进程正在使用相同的代码区域,如果所有人都想并行访问,那么结果是重叠的。 例如,多个用户仅使用一台打印机(通用/关键部分),例如3
个用户,同时给予3
个作业,如果所有作业并行启动,则一个用户输出与另一个用户输出重叠。 因此,我们需要使用信号量来保护这个信号,即当一个进程正在运行时锁定关键部分,并在完成时解锁。 这将为每个用户/进程重复,以便一个作业不与另一个作业重叠。
基本上信号量分为两类 -
- 二进制信号 - 只有两个状态
0
和1
,即锁定/解锁或可用/不可用,互斥实现。 - 计算信号量 - 允许任意资源计数的信号量称为计数信号量。
假设有5台打印机(要了解1
台打印机只接受1
一项工作),我们有3
个打印作业。 现在有三个打印机(每个打印机1
个)提供3
个工作。 这项工作还在进行中,共有4
项工作。 现在,在可用的两台打印机中,已经安排了两个作业,剩下两个作业,只有在其中一个资源/打印机可用时才能完成。 根据资源可用性的这种调度可以被看作计数信号量。
要使用信号量执行同步,请执行以下步骤 -
第1步 - 创建一个信号量或连接到一个已经存在的信号量(semget()
)
第2步 - 对信号量执行操作,即分配或释放或等待资源(semop()
)
第3步 - 在消息队列(semctl()
)上执行控制操作
现在,让我们查看一下系统调用。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semget(key_t key, int nsems, int semflg)
这个系统调用创建或分配一个System V信号集。 需要传递以下参数 -
- 第一个参数
key
用于识别消息队列。key
可以是任意值,也可以是来自库函数ftok()
的值。 - 第二个参数
nsems
指定了信号的数量。 如果二进制那么它是1
,意味着需要1
个信号集,否则按照所需的信号量集计数。 - 第三个参数
semflg
指定所需的信号量标志,如IPC_CREAT(如果不存在则创建信号量)或IPC_EXCL(与IPC_CREAT一起用于创建信号量,如果信号量已经存在,则调用失败)。 还需要传递权限。注 - 有关权限的详细信息,请参阅前面几节。
这个调用会在成功时返回有效的信号量标识符(用于进一步调用信号量),在失败的情况下返回-1
。 要知道失败的原因,请检查errno变量或perror()函数。
关于这个调用的各种错误是EACCESS(权限被拒绝),EEXIST(队列已经存在不能创建),ENOENT(队列不存在),ENOMEM(没有足够的内存来创建队列),ENOSPC(最大限制 超过)等
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semop(int semid, struct sembuf *semops, size_t nsemops)
这个系统调用在System V信号集上执行操作,即分配资源,等待资源或释放资源。 以下参数需要传递 -
- 第一个参数
semid
指示由semget()
创建的信号集标识符。 - 第二个参数
semops
是指向要在信号集上执行的操作数组的指针。 结构如下 -
上述结构中的元素struct sembuf { unsigned short sem_num; /* Semaphore set num */ short sem_op; /* Semaphore operation */ short sem_flg; /* Operation flags, IPC_NOWAIT, SEM_UNDO */ };
sem_op
指示需要执行的操作 -- 如果
sem_op
是-ve
,则分配或获取资源。 阻塞调用进程,直到其他进程释放了足够的资源,以便此进程可以分配。 - 如果
sem_op
为0
,则调用进程等待或休眠,直到信号量值达到0
。 - 如果
sem_op
是+ve
,则释放资源。
- 如果
- 第三个参数
nsemops
是该数组中的操作数。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semctl(int semid, int semnum, int cmd, …)
此系统调用执行System V信号量的控制操作。 以下参数需要传递 -
- 第一个参数
semid
是信号量的标识符。 这个id
是信号量标识符,它是semget()
系统调用的返回值。 - 第二个参数
semnum
是信号量的数量。 信号量从0
开始编号。 - 第三个参数
cmd
是在信号量上执行所需控制操作的命令。 - 第四个参数是
union semun
,取决于cmd
。 少数情况下,第四个参数是不适用的。
让我们来看看union semun
-
union semun {
int val; /* val for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT and IPC_SET */
unsigned short *array; /* Buffer for GETALL and SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO and SEM_INFO*/
};
在sys/sem.h
中定义的semid_ds
数据结构如下所示 -
struct semid_ds {
struct ipc_perm sem_perm; /* Permissions */
time_t sem_otime; /* Last semop time */
time_t sem_ctime; /* Last change time */
unsigned long sem_nsems; /* Number of semaphores in the set */
};
注 - 请参阅手册页以获取其他数据结构。
union semun arg
的有效值是 -
IPC_STAT
- 将struct semid_ds
的每个成员的当前值的信息复制到arg.buf
指向的传递结构。 该命令需要信号量的读取权限。IPC_SET
- 设置结构semid_ds
指向的用户ID,所有者的组ID,权限等。IPC_RMID
- 删除信号集。IPC_INFO
- 返回有关arg.__ buf
指向的semid_ds
结构中的信号限制和参数的信息。SEM_INFO
- 返回一个包含有关信号量消耗的系统资源信息的seminfo
结构。
这个调用将根据传递的命令返回值(非负值)。 一旦成功,IPC_INFO和SEM_INFO或SEM_STAT返回根据Semaphore的最高使用条目的索引或标识符,或GETPID的semncnt值或GETPID的sempid值或GETVAL 0的semval值, 1在失败的情况下。 要知道失败的原因,请检查errno变量或perror()函数。
在看代码之前,让我们了解它的实现 -
- 创建两个进程 - 子进程和父进程。
- 创建共享内存主要需要存储计数器和其他标志,以指示读/写过程结束到共享内存中。
- 计数器由父进程和子进程的计数递增。 计数可以作为命令行参数传递,也可以作为默认值(如果不作为命令行参数传递,或者值小于10000)。 被调用一定的睡眠时间,以确保父母和孩子同时访问共享内存,即并行访问。
- 由于父进程和子进程的计数器都是以1为单位递增,所以最终的数值应该是计数器的两倍。 因为父,子进程同时执行这些操作,所以计数器不会按需要递增。 因此,我们需要确保一个进程完成之后的其他过程的完整性。
- 以上所有的实现都在
shm_write_cntr.c
文件中执行 - 检查计数器值是否在文件
shm_read_cntr.c
中实现 - 为了确保完成,信号量程序在文件
shm_write_cntr_with_sem.c
中实现。 在完成整个过程(从其他程序完成读取之后)中删除信号量 - 由于有单独的文件来读取共享内存中的计数器的值,并且没有任何影响,读取程序保持不变(
shm_read_cntr.c
) - 在一个终端上执行写入程序并从另一个终端上读取程序总是比较好的。 因为程序只有在写入和读取过程完成之后才能完成执行,那么在完全执行写入程序之后运行程序就可以了。 写程序将一直等到读程序运行,并且只有在完成后才能完成。
没有信号量的程序 -
/* Filename: shm_write_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#define SHM_KEY 0x12345
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count);
int main(int argc, char *argv[]) {
int shmid;
struct shmseg *shmp;
char *bufptr;
int total_count;
int sleep_time;
pid_t pid;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
printf("Total Count is %d\n", total_count);
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach");
return 1;
}
shmp->cntr = 0;
pid = fork();
/* Parent Process - Writing Once */
if (pid > 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
} else if (pid == 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
return 0;
} else {
perror("Fork Failure\n");
return 1;
}
while (shmp->read_complete != 1)
sleep(1);
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
perror("shmctl");
return 1;
}
printf("Writing Process: Complete\n");
return 0;
}
/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
int cntr;
int numtimes;
int sleep_time;
cntr = shmp->cntr;
shmp->write_complete = 0;
if (pid == 0)
printf("SHM_WRITE: CHILD: Now writing\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Now writing\n");
//printf("SHM_CNTR is %d\n", shmp->cntr);
/* Increment the counter in shared memory by total_count in steps of 1 */
for (numtimes = 0; numtimes < total_count; numtimes++) {
cntr += 1;
shmp->cntr = cntr;
/* Sleeping for a second for every thousand */
sleep_time = cntr % 1000;
if (sleep_time == 0)
sleep(1);
}
shmp->write_complete = 1;
if (pid == 0)
printf("SHM_WRITE: CHILD: Writing Done\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Writing Done\n");
return;
}
执行上面程序代码,得到以下结果 -
Total Count is 10000
SHM_WRITE: PARENT: Now writing
SHM_WRITE: CHILD: Now writing
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete
现在让我们来看看共享内存读取程序的实现 -
/* Filename: shm_read_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#define SHM_KEY 0x12345
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
int main(int argc, char *argv[]) {
int shmid, numtimes;
struct shmseg *shmp;
int total_count;
int cntr;
int sleep_time;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach");
return 1;
}
/* Read the shared memory cntr and print it on standard output */
while (shmp->write_complete != 1) {
if (shmp->cntr == -1) {
perror("read");
return 1;
}
sleep(3);
}
printf("Reading Process: Shared Memory: Counter is %d\n", shmp->cntr);
printf("Reading Process: Reading Done, Detaching Shared Memory\n");
shmp->read_complete = 1;
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
printf("Reading Process: Complete\n");
return 0;
}
执行上面程序代码,得到以下结果 -
Reading Process: Shared Memory: Counter is 11000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete
如果观察到上面的输出,计数器应该是20000,但是,因为在一个进程任务完成之前其他进程也是并行处理的,所以计数器值不是预期的。 每个系统的输出会有所不同,而且每次执行都会有所不同。 为了确保两个进程在完成一个任务后执行任务,应该使用同步机制来实现。
现在,让我们使用信号机制,看看下面相同的应用程序。
注 - 读取程序的实现保持不变。
/* Filename: shm_write_cntr_with_sem.c */
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/sem.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#define SHM_KEY 0x12345
#define SEM_KEY 0x54321
#define MAX_TRIES 20
struct shmseg {
int cntr;
int write_complete;
int read_complete;
};
void shared_memory_cntr_increment(int, struct shmseg*, int);
void remove_semaphore();
int main(int argc, char *argv[]) {
int shmid;
struct shmseg *shmp;
char *bufptr;
int total_count;
int sleep_time;
pid_t pid;
if (argc != 2)
total_count = 10000;
else {
total_count = atoi(argv[1]);
if (total_count < 10000)
total_count = 10000;
}
printf("Total Count is %d\n", total_count);
shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
if (shmid == -1) {
perror("Shared memory");
return 1;
}
// Attach to the segment to get a pointer to it.
shmp = shmat(shmid, NULL, 0);
if (shmp == (void *) -1) {
perror("Shared memory attach: ");
return 1;
}
shmp->cntr = 0;
pid = fork();
/* Parent Process - Writing Once */
if (pid > 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
} else if (pid == 0) {
shared_memory_cntr_increment(pid, shmp, total_count);
return 0;
} else {
perror("Fork Failure\n");
return 1;
}
while (shmp->read_complete != 1)
sleep(1);
if (shmdt(shmp) == -1) {
perror("shmdt");
return 1;
}
if (shmctl(shmid, IPC_RMID, 0) == -1) {
perror("shmctl");
return 1;
}
printf("Writing Process: Complete\n");
remove_semaphore();
return 0;
}
/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
int cntr;
int numtimes;
int sleep_time;
int semid;
struct sembuf sem_buf;
struct semid_ds buf;
int tries;
int retval;
semid = semget(SEM_KEY, 1, IPC_CREAT | IPC_EXCL | 0666);
//printf("errno is %d and semid is %d\n", errno, semid);
/* Got the semaphore */
if (semid >= 0) {
printf("First Process\n");
sem_buf.sem_op = 1;
sem_buf.sem_flg = 0;
sem_buf.sem_num = 0;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Operation: ");
return;
}
} else if (errno == EEXIST) { // Already other process got it
int ready = 0;
printf("Second Process\n");
semid = semget(SEM_KEY, 1, 0);
if (semid < 0) {
perror("Semaphore GET: ");
return;
}
/* Waiting for the resource */
sem_buf.sem_num = 0;
sem_buf.sem_op = 0;
sem_buf.sem_flg = SEM_UNDO;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked: ");
return;
}
}
sem_buf.sem_num = 0;
sem_buf.sem_op = -1; /* Allocating the resources */
sem_buf.sem_flg = SEM_UNDO;
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked: ");
return;
}
cntr = shmp->cntr;
shmp->write_complete = 0;
if (pid == 0)
printf("SHM_WRITE: CHILD: Now writing\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Now writing\n");
//printf("SHM_CNTR is %d\n", shmp->cntr);
/* Increment the counter in shared memory by total_count in steps of 1 */
for (numtimes = 0; numtimes < total_count; numtimes++) {
cntr += 1;
shmp->cntr = cntr;
/* Sleeping for a second for every thousand */
sleep_time = cntr % 1000;
if (sleep_time == 0)
sleep(1);
}
shmp->write_complete = 1;
sem_buf.sem_op = 1; /* Releasing the resource */
retval = semop(semid, &sem_buf, 1);
if (retval == -1) {
perror("Semaphore Locked\n");
return;
}
if (pid == 0)
printf("SHM_WRITE: CHILD: Writing Done\n");
else if (pid > 0)
printf("SHM_WRITE: PARENT: Writing Done\n");
return;
}
void remove_semaphore() {
int semid;
int retval;
semid = semget(SEM_KEY, 1, 0);
if (semid < 0) {
perror("Remove Semaphore: Semaphore GET: ");
return;
}
retval = semctl(semid, 0, IPC_RMID);
if (retval == -1) {
perror("Remove Semaphore: Semaphore CTL: ");
return;
}
return;
}
执行上面示例代码,得到以下结果 -
Total Count is 10000
First Process
SHM_WRITE: PARENT: Now writing
Second Process
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Now writing
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete
现在,我们将通过读取进程来检查计数器值。执行结果如下 -
Reading Process: Shared Memory: Counter is 20000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete