Jos*_*h.F 6 python queue multithreading
在下面的代码中,我有两个队列来运行不同类型的线程。这些线程递归地添加到彼此的队列中(队列 1 获取一些信息,队列 2 处理它并将更多信息添加到队列 1)。
我想等到两个队列中的所有项目都被完全处理。目前我正在使用此代码
queue.join()
out_queue.join()
Run Code Online (Sandbox Code Playgroud)
问题是当第一个队列暂时用完要做的事情时,它会关闭,因此在此之后它永远不会看到队列 2(out_queue)添加到它的内容。
我添加了 time.sleep() 函数,这是一个非常糟糕的修复程序,到 30 年代,两个队列都已填满而不会耗尽。
解决此问题的标准 Python 方法是什么?我是否必须只有一个队列,并将其中的项目标记为应该由哪个线程处理?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
Run Code Online (Sandbox Code Playgroud)
更改工作人员,以便他们需要一个哨兵值才能退出,而不是在队列中没有更多工作时退出。在以下代码中,howdy工作线程从输入队列中读取项目。如果该值是哨兵(None,但它可以是任何值),则工作人员退出。
因此,您不需要搞乱超时,正如您发现的那样,超时可能相当危险。另一个结果是,如果您有N 个线程,则必须将N 个哨兵附加到输入队列中以杀死您的工作线程。否则你最终会遇到一个永远等待的工人。僵尸工人,如果你愿意的话。
import threading, Queue
def howdy(q):
for msg in iter(q.get, None):
print 'howdy,',msg
inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
inq.put(word)
inq.put( None ) # tell worker to exit
thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()
Run Code Online (Sandbox Code Playgroud)
howdy, whiskey
howdy, syrup
howdy, bitters
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4571 次 |
| 最近记录: |