Python - queue.task_done()用于什么?

J. *_*lor 7 python queue task

我写了一个脚本,它有多个线程(用它创建threading.Thread)从一个Queueusing中获取URL queue.get_nowait(),然后处理HTML.我是多线程编程的新手,我很难理解queue.task_done()函数的用途.

Queue为空时,它会自动返回queue.Empty异常.所以我不明白每个线程需要调用该task_done()函数.我们知道我们已经完成了队列的空白,所以为什么我们需要通知它工作线程已经完成了他们的工作(与队列有关,在他们从中获取了URL之后) ?

有人可以给我提供一个代码示例(理想情况下使用urllib,文件I/O,或斐波那契数字以外的其他内容并打印"Hello"),它向我展示了如何在实际应用中使用此函数?

use*_*ica 13

Queue.task_done不是为了工人的利益.它是支持Queue.join.


如果我给你一盒工作任务,我是否关心你什么时候开箱即用?

不,我关心工作何时完成.看着一个空盒子并没有告诉我.你和其他5个人可能仍在研究你开箱即用的东西.

Queue.task_done让工作人员说完任务.等待所有工作完成的人Queue.join将等到task_done已经进行了足够的调用,而不是在队列为空时.

  • @ifelsemonkey在非常常见的使用场景中,它在工作人员之间分配任务非常方便且有用。并且仅在您需要时才适用。如果您从不打算在队列中调用`join()`,那么您也无需了解或关心`task_done()`。 (3认同)
  • 是的,但我要问的是为什么队列需要知道任务(即处理 HTML、将提取的数据插入数据库等)何时完成?当它是空的时,我就完成了。为什么我不能在抛出 `queue.Empty` 异常时知道我已经完成了?为什么 URL 队列需要知道我已成功将某些内容输入数据库或处理了某些 HTML?所有队列应该关心的是是否所有的 URL 都被分派到某个线程。 (2认同)
  • @J.Taylor:调用`Queue.join` 的人需要知道。 (2认同)
  • @J.Taylor:取决于谁需要知道工作已完成,以及在工作完成时是否将工作添加到队列中。*workers* 不需要调用 `task_done` 来知道他们是否完成了。`task_done` 是一种与任何等待工作完成的线程通信的方式。 (2认同)
  • 对我来说,暗示task_is_done/join语义的队列概念是不寻常的.一个队列应该只关注put和get,并将其视为空或满.不是task_done()的东西.我在暮光之城吗? (2认同)

Jos*_*ush 13

.task_done() is used to mark .join() that the processing is done.

If you use .join() and don't call .task_done() for every processed item, your script will hang forever.


Ain't nothin' like a short example;

import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = False


def items_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item is None:
                continue

            try:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            pass
        except:
            logging.exception('error while processing item')


def process_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))


if __name__ == '__main__':
    running = True

    # Create 10 items_queue_worker threads
    worker_threads = 10
    for _ in range(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with data
    for i in range(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False
Run Code Online (Sandbox Code Playgroud)

  • 当您永远无法达到它时,为什么需要“if item is None:”的额外测试(除非您实际上在 items_queue 中存储 None 值,而您现在不这样做)?据我所知,如果队列为空,则会抛出空异常,因此执行将“跳转”到 catch 子句。 (2认同)

use*_*342 5

有人可以为我提供一个代码示例(最好使用 urllib、文件 I/O 或斐波那契数列和打印“Hello”以外的其他内容)来向我展示如何在实际应用中使用此函数?

@user2357112 的答案很好地解释了 的目的task_done,但缺少请求的示例。这是一个计算任意数量文件的校验和并返回将每个文件名映射到相应校验和的字典的函数。在函数内部,工作被分配给多个线程。

该函数使用 来Queue.join等待工作人员完成分配的任务,因此可以安全地将字典返回给调用者。这是等待所有文件被处理的便捷方法,而不是仅仅将它们出列。

import threading, queue, hashlib

def _work(q, checksums):
    while True:
        filename = q.get()
        if filename is None:
            q.put(None)
            break
        try:
            sha = hashlib.sha256()
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

def calc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i in range(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exit
    return checksums
Run Code Online (Sandbox Code Playgroud)

关于 GIL 的注释:由于代码在hashlib计算校验和时内部释放了 GIL,因此与单线程变体相比,使用多线程会产生可测量的加速(1.75 倍至 2 倍,具体取决于 Python 版本)。


Cha*_*iam 5

“阅读消息来源,卢克!” ——欧比一科多比

ayncio.queue的源代码很短。

  • 当您放入队列时,未完成的任务数增加一。
  • 当你调用 task_done 时它​​会下降一个
  • join() 等待没有未完成的任务。

当且仅当您调用 task_done() 时,这使得 join 有用。使用经典的银行类比:

  • 人们进门排队;门是一个生产者做 q.put()
  • 当柜员空闲而有人在排队时,他们会去柜员窗口。 出纳员做了一个 q.get()
  • 当出纳员完成帮助这个人时,他们就准备好迎接下一个。 出纳员做了一个 q.task_done()
  • 下午5点,门锁门任务完成
  • 你等到两条线都空了每个出纳员都完成了帮助他们面前的人。 等待 q.join(柜员)
  • 然后你把出纳员送回家,他们现在都空着队列闲着。 对于出纳员中的出纳员:teller.cancel()

如果没有 task_done(),您就无法知道每个出纳员都与人打交道。您不能在出纳员窗口有人的情况下将其送回家。