Python:通过完成对任务进行排序后获取对原始任务的引用

dri*_*ter 3 python-asyncio

问题:asyncio.as_completed 产生结果后,如何获取原始任务的引用?

基本上与这个 C# 问题相同,除了在 Python:按完成排序任务后获取对原始任务的引用?

示例问题:

# Takes a list of WebClient objects,
# calls each one simultaneously,
# and yields the results immediately as they arrive
# to a synchronous caller.

def yieldThingsAsTheyArrive(webClients):

    tasks = []
    for webClient in webClients:
        # This is what we want to get a reference to later:
        task = webClient.fetch_thing()  # start long-running request asynchronously
        tasks.append(task)

    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed(tasks):
        thing = loop.run_until_complete(future)  # since our caller is synchronous, wait until the task completes so we can yield the final result instead of a future
        thing.originalWebClient = ???  # This is where we need a reference to the original webClient
        yield thing
Run Code Online (Sandbox Code Playgroud)

use*_*342 6

as_completed特殊之处在于它既不会产生类似的期货asyncio.wait,也不会产生类似的结果asyncio.gather。相反,它会生成您需要等待的协程(以您喜欢的任何方式)以按完成顺序获得结果。它不能产生您传递给它的期货,因为那时它还不知道接下来将完成哪个已传递的期货。

您可以通过将任务包装在另一个 future 中来关联任意数据,其结果是任务对象(您已将数据附加到该对象)。这本质上等同于C# 代码所做的,只是没有静态键入仪式。从这个答案中获取设置,一个可运行的示例如下所示:

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()

    wrappers = []
    for idx, coro in enumerate((first(), second(), third())):
        task = loop.create_task(coro)
        task.idx = idx + 1
        # Wrap the task in a future that completes when the 
        # task does, but whose result is the task object itself.
        wrapper = loop.create_future()
        task.add_done_callback(wrapper.set_result)
        wrappers.append(wrapper)

    for x in asyncio.as_completed(wrappers):
        # yield completed tasks
        yield loop.run_until_complete(x)

for task in ordinary_generator():
    print(task.result(), task.idx)
Run Code Online (Sandbox Code Playgroud)

我推荐的另一个选项是as_completed用一个调用asyncio.wait(return_when=FIRST_COMPLETED). 这也将提供完整的期货,但不需要额外的包装,并导致稍微更惯用的 asyncio 代码。我们调用ensure_future每个协程将其转换为未来,将数据附加到它,然后才将它传递给asyncio.wait(). 由于wait返回相同的期货,因此附加数据在它们上。

def ordinary_generator():
    loop = asyncio.get_event_loop()

    pending = []
    for idx, coro in enumerate((first(), second(), third())):
        task = loop.create_task(coro)
        task.idx = idx + 1
        pending.append(task)

    while pending:
        done, pending = loop.run_until_complete(asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED))
        for task in done:
            yield task
Run Code Online (Sandbox Code Playgroud)