我有一些生产者函数,它依赖于I/O重阻塞调用和一些依赖于I/O重阻塞调用的消费者函数.为了加快速度,我使用了Gevent微线程库作为粘合剂.
这就是我的范例:
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
producers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
print "Got item %s" % item
do_work(wid, item)
finally:
print "No more items"
q.task_done()
def producer():
while True:
item = random.randint(1, 11)
if item == 10:
print "Signal Received"
return
else:
print "Added item %s" % item
q.put(item)
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
#This doesnt work.
for j in range(2):
producers.append(gevent.spawn(producer))
#Uncommenting this makes this script work.
#producer()
q.join()
Run Code Online (Sandbox Code Playgroud)
我有四个消费者,并希望有两个生产者.生产者在他们发出信号即10时退出.消费者继续从这个队列中取食,当生产者和消费者结束时整个任务结束.
但是,这不起作用.如果我注释掉for产生多个生成器并且只使用单个生成器的循环,那么脚本运行正常.
我似乎无法弄清楚我做错了什么.
有任何想法吗?
谢谢
当队列没有未完成的工作时,您实际上并不想退出,因为从概念上讲,应用程序应该完成时不会.
你想在制片人完成时退出,然后在没有未完成的工作时退出.
# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1968 次 |
| 最近记录: |