如何从Python中的主线程终止Producer-Consumer线程?

Hon*_*hen 3 python multithreading producer-consumer python-multithreading

我有一个Producer和一个Consumer线程(threading.Thread),它共享一个queue类型Queue.

制片人run:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)
Run Code Online (Sandbox Code Playgroud)

消费者run:

while self.running or not queue.empty():
    product = queue.get()
    time.sleep(several_seconds) ###
    consume(product)
Run Code Online (Sandbox Code Playgroud)

现在我需要终止主线程中的两个线程,并且queue在终止之前必须为空(全部已消耗)的要求.

目前我正在使用如下代码来终止这两个线程:

主线程stop:

producer.running = False
producer.join()
consumer.running = False
consumer.join()
Run Code Online (Sandbox Code Playgroud)

但我想如果有更多的消费者,这是不安全的.

另外,我不确定是否sleep会向制作人发布时间表以便它可以生产更多产品.事实上,我发现生产者一直"挨饿",但我不确定这是否是根本原因.

有没有一个体面的方法来处理这个案子?

Neo*_*ang 6

您可以将一个Sentinel对象放入队列以指示任务结束,从而导致所有使用者终止:

_sentinel = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_sentinel)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _sentinel:
            # put it back so that other consumers see it
            queue.put(_sentinel)
            break
        # Process data
Run Code Online (Sandbox Code Playgroud)

这个片段是从Python Cookbook 12.3中无耻地复制的.

  1. 使用a _sentinel标记队列结束.None如果生产者没有生成任务None,也可以使用,但_sentinel对于更一般的情况,使用a 更安全.
  2. 对于每个使用者,您不需要将多个结束标记放入队列中.您可能不知道有多少线程正在消耗.当消费者找到它时,只需将哨兵放回队列,让其他消费者得到信号.