如何在多线程生产者 - 消费者模式中完成工作后退出工作线程?

Lon*_*ner 14 python concurrency multithreading producer-consumer python-2.7

我正在尝试使用Python 2.7中的Queue.Queue实现多线程生产者 - 消费者模式.我试图找出如何使消费者,即工人线程,一旦完成所有必要的工作,停止.

请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080

发送'我完成'任务,指示池线程终止.任何获得此类任务的线程都会重新排队,然后自杀.

但这对我不起作用.例如,请参阅以下代码.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()
Run Code Online (Sandbox Code Playgroud)

在所有三个工作人员都已读取退出指示符(即-1从队列中)之后,此程序挂起,因为每个工作程序-1在退出之前重新排队,因此队列永远不会变为空并且q.join()永远不会返回.

我提出了以下但丑陋的解决方案,我-1通过队列为每个工人发送一个退出指示器,这样每个工人都可以看到它并自杀.但事实上,我必须为每个工人发送一个退出指示器感觉有点难看.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()
Run Code Online (Sandbox Code Playgroud)

我有两个问题.

  1. 可以发送单个出口指示器的所有线程(在第二评论解释的方法/sf/answers/1355891421/马丁詹姆斯)甚至工作?
  2. 如果上一个问题的答案是"否",有没有办法解决问题,我不必为每个工作线程发送单独的退出指示器?

Tra*_*ger 16

不要将其称为任务的特殊情况.

使用事件代替,为您的员工使用非阻塞实现.

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'
Run Code Online (Sandbox Code Playgroud)


Net*_*ave 12

可以发送单个出口指示器的所有线程(在第二评论解释的方法/sf/answers/1355891421/马丁詹姆斯)甚至工作?

正如您所注意到的那样,它无法正常工作,传播消息将使最后一个线程用另外一个项更新队列,因为您正在等待一个永远不会为空的队列,而不是您拥有的代码.

如果上一个问题的答案是"否",有没有办法解决问题,我不必为每个工作线程发送单独的退出指示器?

您可以join使用线程而不是队列:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()
Run Code Online (Sandbox Code Playgroud)

由于Queue文档解释get方法会在其空的时候引发一个execption,所以如果您已经知道要处理的数据,您可以填充队列然后垃圾邮件线程:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()
Run Code Online (Sandbox Code Playgroud)

这里有一个实例