我写了一个脚本,它有多个线程(用它创建threading.Thread
)从一个Queue
using中获取URL queue.get_nowait()
,然后处理HTML.我是多线程编程的新手,我很难理解queue.task_done()
函数的用途.
当Queue
为空时,它会自动返回queue.Empty
异常.所以我不明白每个线程需要调用该task_done()
函数.我们知道我们已经完成了队列的空白,所以为什么我们需要通知它工作线程已经完成了他们的工作(与队列有关,在他们从中获取了URL之后) ?
有人可以给我提供一个代码示例(理想情况下使用urllib
,文件I/O,或斐波那契数字以外的其他内容并打印"Hello"),它向我展示了如何在实际应用中使用此函数?
use*_*ica 13
Queue.task_done
不是为了工人的利益.它是支持Queue.join
.
如果我给你一盒工作任务,我是否关心你什么时候开箱即用?
不,我关心工作何时完成.看着一个空盒子并没有告诉我.你和其他5个人可能仍在研究你开箱即用的东西.
Queue.task_done
让工作人员说完任务.等待所有工作完成的人Queue.join
将等到task_done
已经进行了足够的调用,而不是在队列为空时.
Jos*_*ush 13
.task_done()
is used to mark .join()
that the processing is done.
If you use
.join()
and don't call.task_done()
for every processed item, your script will hang forever.
Ain't nothin' like a short example;
import logging
import queue
import threading
import time
items_queue = queue.Queue()
running = False
def items_queue_worker():
while running:
try:
item = items_queue.get(timeout=0.01)
if item is None:
continue
try:
process_item(item)
finally:
items_queue.task_done()
except queue.Empty:
pass
except:
logging.exception('error while processing item')
def process_item(item):
print('processing {} started...'.format(item))
time.sleep(0.5)
print('processing {} done'.format(item))
if __name__ == '__main__':
running = True
# Create 10 items_queue_worker threads
worker_threads = 10
for _ in range(worker_threads):
threading.Thread(target=items_queue_worker).start()
# Populate your queue with data
for i in range(100):
items_queue.put(i)
# Wait for all items to finish processing
items_queue.join()
running = False
Run Code Online (Sandbox Code Playgroud)
有人可以为我提供一个代码示例(最好使用 urllib、文件 I/O 或斐波那契数列和打印“Hello”以外的其他内容)来向我展示如何在实际应用中使用此函数?
@user2357112 的答案很好地解释了 的目的task_done
,但缺少请求的示例。这是一个计算任意数量文件的校验和并返回将每个文件名映射到相应校验和的字典的函数。在函数内部,工作被分配给多个线程。
该函数使用 来Queue.join
等待工作人员完成分配的任务,因此可以安全地将字典返回给调用者。这是等待所有文件被处理的便捷方法,而不是仅仅将它们出列。
import threading, queue, hashlib
def _work(q, checksums):
while True:
filename = q.get()
if filename is None:
q.put(None)
break
try:
sha = hashlib.sha256()
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
sha.update(chunk)
checksums[filename] = sha.digest()
finally:
q.task_done()
def calc_checksums(files):
q = queue.Queue()
checksums = {}
for i in range(1):
threading.Thread(target=_work, args=(q, checksums)).start()
for f in files:
q.put(f)
q.join()
q.put(None) # tell workers to exit
return checksums
Run Code Online (Sandbox Code Playgroud)
关于 GIL 的注释:由于代码在hashlib
计算校验和时内部释放了 GIL,因此与单线程变体相比,使用多线程会产生可测量的加速(1.75 倍至 2 倍,具体取决于 Python 版本)。
“阅读消息来源,卢克!” ——欧比一科多比
ayncio.queue的源代码很短。
当且仅当您调用 task_done() 时,这使得 join 有用。使用经典的银行类比:
如果没有 task_done(),您就无法知道每个出纳员都与人打交道。您不能在出纳员窗口有人的情况下将其送回家。