当前位置: 首页 > 知识库问答 >
问题:

pthread_cond_wait和pthread_mutex_lock优先级?

咸晨
2023-03-14

我有多个读线程和一个写线程。如果我锁定其中一个读线程上的互斥体并从中发送广播,是否可以保证互斥体将由等待pthread_cond_wait()的写线程锁定,或者另一个正在等待pthread_mutex_lock()的读线程将锁定互斥体?主要问题是pthread_cond_wait()是否比pthread_mutex_lock()具有优先级?

如果不是,如何实现互斥锁始终由pthread_cond_broadcast()上的write thread锁定?

实例

读线程:

pthread_mutex_lock(mutex);
pthread_cond_broadcast(cond);
pthread_mutex_unlock(mutex);

写线程:

pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);

共有1个答案

微生毅
2023-03-14

让我们假设读写两个线程同时到达pthread\u mutex\u lock。因此,要么写入线程在调用pthread\u mutex\u lock时获取互斥,要么读取线程。

如果是写线程,则读线程将等待pthread_mutex_lock。写,通过调用pthread_cond_wait释放互斥锁并阻止cond。它是原子完成的。因此,当读取线程是Grantex的互斥锁时,我们可以确保读取线程等待cond。所以,在cond上的广播到达写线程,它不再等待cond,但是-仍然在pthread_cond_wait的范围内-尝试在互斥锁上获得锁(保持被读取线程)。在广播cond后,读取线程释放互斥锁,并进入写入线程。因此写线程最终退出pthread_cond_wait,并锁定了互斥锁。以后记得解锁。

如果是读线程,写线程将等待pthread\u mutex\u lock,读线程将在cond上广播一个信号,然后释放mutex。然后,写入线程在pthread\u mutex\u lock上获取mutex,并立即在其中释放pthread\u cond\u waitwaitingcond(请注意,之前的cond广播对当前pthread\u cond\u wait没有影响)。在读取线程的下一次迭代中,它获得了互斥锁,在条件下发送广播并解锁互斥锁。这意味着写入线程在cond上向前移动,并在互斥锁上获得锁。

它回答了你关于优先权的问题吗?

评论后更新。

让我们假设有一个线程(我们将其命名为A,以备将来参考)持有互斥锁,而其他几个线程试图获取相同的锁。一旦第一个线程释放了锁,就无法预测哪个线程将获得锁。此外,如果A线程有一个循环,并试图重新获取互斥锁上的锁,则它有可能被授予此锁,其他线程将继续等待。添加pthread\u cond\u wait不会更改授予锁的范围内的任何内容。

让我引用POSIX规范的片段(参见https://stackoverflow.com/a/9625267/2989411(供参考):

这些函数以原子方式释放互斥,并导致调用线程阻塞条件变量cond;原子性在这里意味着“原子性地涉及到另一个线程对互斥体的访问,然后是对条件变量的访问”。也就是说,如果另一个线程能够在即将阻止的线程释放互斥对象后获取互斥对象,则随后对该线程中pthread_cond_broadcast()或pthread_cond_signal()的调用应表现为它是在即将阻止的线程被阻止后发出的。

这只是标准对操作顺序的保证。向其他线程授予锁的顺序是相当不可预测的,它会根据一些非常微妙的时间波动而变化。

对于互斥锁相关代码,请使用以下代码:

#define _GNU_SOURCE
#include <pthread.h>

#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *th(void *arg) {
    int i;
    char *s = arg;
    for (i = 0; i < 10; ++i) {
        pthread_mutex_lock(&mutex);
        printf("%s %d\n", s, i);
        //sleep(1);
        pthread_mutex_unlock(&mutex);
#if 0
        pthread_yield();
#endif
    }
    return NULL;
}

int main() {
    int i;
    for (i = 0; i < 10; ++i) {
        pthread_t t1, t2, t3;
        printf("================================\n");
        pthread_create(&t1, NULL, th, "t1");
        pthread_create(&t2, NULL, th, "     t2");
        pthread_create(&t3, NULL, th, "            t3");
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
    }
    return 0;
}

在一台机器(单个CPU)上,它总是显示从t3,然后是t2,最后是t1的整个循环。在另一个(2个核心)线程的顺序更随机,但几乎总是在授予其他线程互斥锁之前显示每个线程的整个循环。很少有这样的情况:

t1 8
t1 9
            t3 0
     t2 0
     t2 1
     [removed other t2 output]
     t2 8
     t2 9
            t3 1
            t3 2

启用pthread_yield,将#if 0替换为#if 1,并观察结果并检查输出。对我来说,它的工作方式是两个线程交错显示它们的输出,然后第三个线程终于有机会工作了。添加另一个或多个线程。玩睡眠等。它证实了随机行为。

如果您想做一点实验,请编译并运行以下代码。这是一个单一生产者-多消费者模式的例子。它可以使用两个参数运行:第一个参数是使用者线程的数量,第二个参数是生成的数据系列的长度。如果没有给出任何参数,则有一个消费者线程和120个要处理的项目。我还建议在标有/*play here*/的位置使用sleep/usleep:更改参数的值,完全删除睡眠,在适当的时候将其移动到临界部分,或者替换为pthread_yield,并观察行为的变化。

#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

struct data_t {
    int seq;
    int payload;
    struct data_t *next;
};

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;

void push(int seq, int payload) {
    struct data_t *e;
    e = malloc(sizeof(struct data_t));
    e->seq = seq;
    e->payload = payload;
    e->next = NULL;
    if (last == NULL) {
        assert(first == NULL);
        first = last = e;
    } else {
        last->next = e;
        last = e;
    }
}

struct data_t pop() {
    struct data_t res = {0};
    if (first == NULL) {
        res.seq = -1;
    } else {
        res.seq = first->seq;
        res.payload = first->payload;
        first = first->next;
        if (first == NULL) {
            last = NULL;
        }
    }
    return res;
}

void *producer(void *arg __attribute__((unused))) {
    int i;
    printf("producer created\n");
    for (i = 0; i < num_data; ++i) {
        int val;
        sleep(1); /* play here */
        pthread_mutex_lock(&mutex);
        val = rand() / (INT_MAX / 1000);
        push(i, val);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond);
        printf("prod %3d %3d signaled\n", i, val);
    }
    in_progress = 0;
    printf("prod end\n");
    pthread_cond_broadcast(&cond);
    printf("prod end signaled\n");
    return NULL;
}

void *consumer(void *arg) {
    char c_id[1024];
    int t_id = *(int *)arg;
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
    printf("%s created\n", c_id);
    while (1) {
        struct data_t item;
        pthread_mutex_lock(&mutex);
        item = pop();
        while (item.seq == -1 && in_progress) {
            printf("%s waits for data\n", c_id);
            pthread_cond_wait(&cond, &mutex);
            printf("%s got signal\n", c_id);
            item = pop();
        }
        if (!in_progress && item.seq == -1) {
            printf("%s detected end of data.\n", c_id);
            pthread_mutex_unlock(&mutex);
            break;
        }
        pthread_mutex_unlock(&mutex);
        printf("%s processing %3d %3d\n", c_id, item.seq, item.payload);
        sleep(item.payload % 10); /* play here */
        printf("%s processed  %3d %3d\n", c_id, item.seq, item.payload);
    }
    printf("%s end\n", c_id);
    return NULL;
}

int main(int argc, char *argv[]) {
    int num_cons = 1;
    pthread_t t_prod;
    pthread_t *t_cons;
    int i;
    int *nums;
    if (argc > 1) {
        num_cons = atoi(argv[1]);
        if (num_cons == 0) {
            num_cons = 1;
        }
        if (num_cons > 99) {
            num_cons = 99;
        }
    }
    if (argc > 2) {
        num_data = atoi(argv[2]);
        if (num_data < 10) {
            num_data = 10;
        }
        if (num_data > 600) {
            num_data = 600;
        }
    }

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data);
    t_cons = malloc(sizeof(pthread_t) * num_cons);
    nums = malloc(sizeof(int) * num_cons);
    if (!t_cons || !nums) {
        printf("Out of memory!\n");
        exit(1);
    }
    srand(time(NULL));
    pthread_create(&t_prod, NULL, producer, NULL);

    for (i = 0; i < num_cons; ++i) {
        nums[i] = i + 1;
        usleep(100000); /* play here */
        pthread_create(t_cons + i, NULL, consumer, nums + i);
    }

    pthread_join(t_prod, NULL);

    for (i = 0; i < num_cons; ++i) {
        pthread_join(t_cons[i], NULL);
    }
    free(nums);
    free(t_cons);

    return 0;
}

我希望我已经消除了您的疑虑,并为您提供了一些代码进行实验,并获得了一些关于pthread行为的信心。

 类似资料:
  • 假设一个线程在条件变量上阻塞: 互斥锁被解锁,尝试锁定互斥锁的其他线程被解锁: 同时还有另一个线程正在等待获取关键部分的所有权: 现在的问题是:调用pthread_cond_signal()时,是否保证pthread_cond_wait()[1]将在pthread_mutex_lock()[2]之前解除阻塞? POSIX规范似乎没有说明这种情况。

  • 问题内容: 我想知道如何或/和如何工作? 例如,如果我想获取display = 1的所有行 我可以做 如果我想要显示= 1或2的所有行 我可以做 但是,如果我想获取display = 1或2的所有行,并且其中 任何 内容,标签或标题包含 逻辑将如何发挥作用? 是我的猜测。但是我可以通过几种方式阅读。 它的读数是否为: 或作为 等等 问题答案: MySQL文档有一个很好的页面,其中包含有关哪些运算符

  • 一般来说,和的调用如下: 步骤是 > 线程2锁定互斥锁并调用,从而解锁互斥锁 在线程1中,调用,并再次锁定互斥锁 现在在线程2中,在调用之后,

  • 在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.

  • RabbitMQ有消息优先级的概念吗?我有一个问题,一些更重要的消息由于队列中不太重要的消息而被拖慢。我希望高优先级的优先,并移动到队列的前面。 我知道我可以用两个队列来近似计算,一个是“快”队列,另一个是“慢”队列,但这看起来像是一个黑客。 有人知道使用RabbitMQ的更好的解决方案吗?

  • 对于特定的路由,我有一个带有doTry()-doCatch()对的路由,而通常是onException()。 附言。删除onException()会调用doCatch()。然而,我有理由保留这两个。Camel版本为:org.apache.Camel:camel-cxf:2.21.0.000033-fuse-000001-redhat-1