当前位置: 首页 > 知识库问答 >
问题:

如何在多线程生产者-消费者模式中完成工作后使工作线程退出?

李光华
2023-03-14

我正在尝试使用队列实现多线程生产者-消费者模式。Python 2.7中的队列。我试图找出如何让消费者,即工作线程,在完成所有必需的工作后停止。

参见马丁·詹姆斯对这个答案的第二条评论:https://stackoverflow.com/a/19369877/1175080

发送“我已完成”任务,指示池线程终止。任何获得此类任务的线程都会重新请求它,然后自杀。

但这对我不起作用。例如,请参见以下代码。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

在所有三个工作人员都已从队列中读取退出指示器(即退出指示器)后,此程序将挂起,因为每个工作人员在退出之前都会重新请求退出指示器,因此队列永远不会变空,也永远不会返回。

我想出了以下但丑陋的解决方案,我通过队列为每个工人发送一个-1退出指示器,这样每个工人都可以看到它并自杀。但是我必须为每个工人发送一个退出指示器的事实感觉有点丑陋。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

我有两个问题。

  1. 为所有线程发送单个退出指示器的方法(如Martin James在https://stackoverflow.com/a/19369877/1175080的第二个注释中所解释的)甚至可以工作吗?
  2. 如果上一个问题的答案是“否”,是否有一种方法可以以我不必为每个工作线程发送单独的退出指示器的方式解决问题?

共有3个答案

胡志
2023-03-14

为了完整起见:您还可以加入一个停止信号-(线程数)。然后每个线程可以将其增加一个,并且只有在停止信号为!=0时才重新排队。

    if data < 0: # negative numbers are used to indicate that the worker should stop
        if data < -1:
            q.put(data + 1)
        # Commit suicide.
        print 'worker', n, 'is exiting'
        break

但我个人会同意特拉维斯·梅林杰或丹尼尔·桑切斯的回答。

颛孙森
2023-03-14

为所有线程发送单个退出指示器的方法(如Martin James在https://stackoverflow.com/a/19369877/1175080的第二个评论中解释的那样)甚至可以工作吗?

当您注意到它无法工作时,传播消息将使最后一个线程使用另一个项目更新队列,并且因为您正在等待一个永远不会为空的队列,而不是使用您拥有的代码。

如果上一个问题的答案是“否”,是否有一种方法可以以我不必为每个工作线程发送单独的退出指示器的方式解决问题?

您可以加入线程而不是队列:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()

正如队列文档所解释的,一旦execption为空,get方法将引发execption,因此如果您已经知道要处理的数据,可以填充队列,然后向线程发送垃圾邮件:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

这里有一个活生生的例子

越运锋
2023-03-14

不要把它称为任务的特例。

而是使用事件,为您的工作人员提供非阻塞实现。

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'
 类似资料:
  • 我有一个生产者-消费者模式的多线程任务。可能有许多生产者和一个消费者。我使用ArrayBlockingQueue作为共享资源。 Producer类中的run()方法: Consumer类中的run()方法: main()方法: 现在,当队列为空时,我有消费者结束条件。但是可能会有一段时间队列变成空的,但是一些生产者线程仍然在工作。所以我只需要在完成所有生产者线程之后才完成消费者线程(但它们的数量事

  • 我正在与Java一起研究生产者-消费者问题的多生产者和消费者用例。代码在GitHub上。同样的实现适用于单个生产者和消费者用例,但对于多生产者和消费者用例却表现得很奇怪。 我有一些关于输出的问题: 一开始,所有生产者和一个消费者都有锁: 我想所有的线程都应该竞争锁,并且最多应该有一个线程拥有所有时间的锁?是不是所有的制作人都共用这个锁?当生产者线程t1持有锁时,使用者线程t5是如何获得锁的? 它运

  • 我已经使用Qt线程实现了生产者/消费者模式。多个生产者线程生成由消费者组合的数据。使用信号/时隙和排队连接实现通信。只要使用者能够比生产者线程更快地消耗数据,这就可以正常工作。 很难使我的代码缩放。特别是增加生产者的数量很容易,但很难产生一个以上的消费线程。 现在,当在具有许多内核的CPU/系统上运行软件时,问题就出现了。在这种情况下,我使用更多的线程来产生数据。有时会发生(取决于数据生成的复杂性

  • 本文向大家介绍Java多线程生产者消费者模式实现过程解析,包括了Java多线程生产者消费者模式实现过程解析的使用技巧和注意事项,需要的朋友参考一下 单生产者与单消费者 示例: 执行结果如下: 多生产者与多消费者 这种模式下,容易出现“假死”,也就是全部线程都进入了 WAITNG 状态,程序不在执行任何业务功能了,整个项目呈停止状态。 示例: 运行结果如图: 分析: 虽然代码中通过 wait/not

  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别