Lon*_*ner 14 python concurrency multithreading producer-consumer python-2.7
我正在尝试使用Python 2.7中的Queue.Queue实现多线程生产者 - 消费者模式.我试图找出如何使消费者,即工人线程,一旦完成所有必要的工作,停止.
请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080
发送'我完成'任务,指示池线程终止.任何获得此类任务的线程都会重新排队,然后自杀.
但这对我不起作用.例如,请参阅以下代码.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Run Code Online (Sandbox Code Playgroud)
在所有三个工作人员都已读取退出指示符(即-1从队列中)之后,此程序挂起,因为每个工作程序-1在退出之前重新排队,因此队列永远不会变为空并且q.join()永远不会返回.
我提出了以下但丑陋的解决方案,我-1通过队列为每个工人发送一个退出指示器,这样每个工人都可以看到它并自杀.但事实上,我必须为每个工人发送一个退出指示器感觉有点难看.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Run Code Online (Sandbox Code Playgroud)
我有两个问题.
Tra*_*ger 16
不要将其称为任务的特殊情况.
使用事件代替,为您的员工使用非阻塞实现.
stopping = threading.Event()
def worker(n, q, timeout=1):
# run until the master thread indicates we're done
while not stopping.is_set():
try:
# don't block indefinitely so we can return to the top
# of the loop and check the stopping event
data = q.get(True, timeout)
# raised by q.get if we reach the timeout on an empty queue
except queue.Empty:
continue
q.task_done()
def master():
...
print 'waiting for workers to finish'
q.join()
stopping.set()
print 'done'
Run Code Online (Sandbox Code Playgroud)
Net*_*ave 12
可以发送单个出口指示器的所有线程(在第二评论解释的方法/sf/answers/1355891421/马丁詹姆斯)甚至工作?
正如您所注意到的那样,它无法正常工作,传播消息将使最后一个线程用另外一个项更新队列,因为您正在等待一个永远不会为空的队列,而不是您拥有的代码.
如果上一个问题的答案是"否",有没有办法解决问题,我不必为每个工作线程发送单独的退出指示器?
您可以join使用线程而不是队列:
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
for t in threads:
threads.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
for t in threads:
t.join()
print 'done'
master()
Run Code Online (Sandbox Code Playgroud)
由于Queue文档解释get方法会在其空的时候引发一个execption,所以如果您已经知道要处理的数据,您可以填充队列然后垃圾邮件线程:
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
try:
data = q.get(block=False, timeout=1)
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
except Queue.Empty:
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Send 10 items to work on.
for i in range(10):
q.put(i)
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Run Code Online (Sandbox Code Playgroud)
这里有一个实例