Python3 如何 asyncio.gather() 部分函数的列表

Ale*_*ges 17 python-3.x python-asyncio

我正在尝试创建一个可以传递给 asyncio.gather() 的协程列表

但是,当我将它们附加到列表中时,我想将参数附加到这些协程。

下面显示的我当前的方法使用 functools.partial。不幸的是 asyncio.gather 不接受部分函数,​​这是有道理的。

对我来说没有意义的是如何找到解决方案。

示例代码:

async def test(arg1):
    print(arg1)

statements = []

function = functools.partial(test, "hello world")
statements.append(function)

results = await asyncio.gather(*statements)
Run Code Online (Sandbox Code Playgroud)

那么如何将参数附加到函数,以便它仍然可以传递给 asyncio.gather?

*编辑

看来我是比较傻了。

我的解决方案相当简单,不要使用 functools.partial,只需将协程直接附加到列表中即可。

代码:

async def test(arg1):
    print(arg1)

async def main():
    statements = []
    statements.append(test("hello_world"))
    results = await asyncio.gather(*statements)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

use*_*342 14

但是,当我将它们附加到列表中时,我想将参数附加到这些协程。

您可以简单地使用它们的必要参数调用协程函数。与普通函数不同(但类似于生成器),调用协程函数不会开始执行它,它只是返回一个可用于驱动其稍后执行的对象。此对象已捕获调用参数,可以添加到传递给 的列表中asyncio.gather

例如:

statements = []

statements.append(test("hello world"))
# ...

results = await asyncio.gather(*statements)
Run Code Online (Sandbox Code Playgroud)

注意:如果您需要多次从函数中收集结果,这将不起作用。有关特殊情况的解决方案,请参阅此答案


Eve*_*awk 11

仅收集多个异步函数调用的结果一次

如果您只需要运行一系列async函数调用一次,您可以简单地将它们立即返回的协程对象存储在一个列表中,将解压缩的列表传递给协程asyncio.gather,然后await从协程收集最终的返回值。

async def upper_cased(value: str) -> str:
    await asyncio.sleep(1)
    return value.upper()

coroutines = [
    upper_cased("h"),
    upper_cased("e"),
    upper_cased("l"),
    upper_cased("l"),
    upper_cased("o"),
    upper_cased(" "),
    upper_cased("w"),
    upper_cased("o"),
    upper_cased("r"),
    upper_cased("l"),
    upper_cased("d"),
]
print("".join(await asyncio.gather(*coroutines)))
# prints HELLO WORLD
Run Code Online (Sandbox Code Playgroud)

这是有效的,因为与普通函数不同,调用async函数(不等待)不会开始执行它,它只返回一个对象,其中包含函数调用的所有必要细节,包括参数值。稍后可以使用此对象以不同方式完成其执行,其中之一是使用await.

多次收集多个异步函数调用的结果

如果您需要多次运行相同的async函数序列(带有预先确定的参数),只需将它们的返回值存储在一个列表中,然后将解压后的列表传递给它是行不通的asyncio.gather

这是因为协程对象只能等待一次。当他们已经返回值时再次等待他们将引发RuntimeError.

async def random_cased(value: str) -> str:
    await asyncio.sleep(1)
    if random.randint(0, 1) == 1:
        return value.upper()
    return value.lower()

coroutines = [
    random_cased("h"),
    random_cased("e"),
    random_cased("l"),
    random_cased("l"),
    random_cased("o"),
    random_cased(" "),
    random_cased("w"),
    random_cased("o"),
    random_cased("r"),
    random_cased("l"),
    random_cased("d"),
]
print("".join(await asyncio.gather(*coroutines)))
# prints HeLlO wOrLd (or any other random cased variation)

print("".join(await asyncio.gather(*coroutines)))
# raises RuntimeError (cannot reuse already awaited coroutine)
Run Code Online (Sandbox Code Playgroud)

对于这种情况,您可以使用functools.partial对象来存储async函数及其参数值。

partial_funcs = [
    functools.partial(random_cased, "h"),
    functools.partial(random_cased, "e"),
    functools.partial(random_cased, "l"),
    functools.partial(random_cased, "l"),
    functools.partial(random_cased, "o"),
    functools.partial(random_cased, " "),
    functools.partial(random_cased, "w"),
    functools.partial(random_cased, "o"),
    functools.partial(random_cased, "r"),
    functools.partial(random_cased, "l"),
    functools.partial(random_cased, "d"),
]
print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)

print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)

print("".join(await asyncio.gather(*[func() for func in partial_funcs])))
# prints HeLlO wOrLd (or any other random cased variation)
Run Code Online (Sandbox Code Playgroud)

如果需要,您也可以使用其他容器functools.partial。例如,一个用于存储函数的列表和另一个用于存储其参数的列表。