多个异步上下文管理器

Nat*_*ara 3 python python-asyncio

是否可以在python中组合异步上下文管理器?类似于asyncio.gather,但能够与上下文管理器一起使用的东西.像这样的东西:

async def foo():
    async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
        await vm1.do_something()
        await vm2.do_something()
Run Code Online (Sandbox Code Playgroud)

这目前可能吗?

use*_*342 6

一些接近gather_cm可实现AsyncExitStack,在Python 3.7介绍:

async def foo():
    async with AsyncExitStack() as stack:
        vm1, vm2 = await asyncio.gather(
            stack.enter_async_context(start_vm()),
            stack.enter_async_context(start_vm()))
        await vm1.do_something()
        await vm2.do_something()
Run Code Online (Sandbox Code Playgroud)

不幸的是,__aexit__s仍将按顺序运行.这是因为AsyncExitStack模拟嵌套的上下文管理器,它具有明确定义的顺序且不能重叠.外部上下文管理器__aexit__将获得有关内部事务管理器是否引发异常的信息.(数据库句柄__aexit__可能会使用它在异常情况下回滚事务并以其他方式提交.)__aexit__并行运行会使上下文管理器重叠,异常信息不可用或不可靠.因此,虽然并行gather(...)运行__aenter__,但AsyncExitStack记录哪一个先出现并__aexit__以相反的顺序运行s.

使用异步上下文管理器,替代方案gather_cm就会非常有意义.可以删除嵌套语义并提供一个聚合上下文管理器,它像"退出池"而不是堆栈一样工作.退出池采用多个彼此独立的上下文管理器,允许它们__aenter____aexit__方法并行运行.

棘手的部分是正确处理异常:如果有任何__aenter__提升,必须传播异常以防止with块被运行.为确保正确性,池必须保证__aexit__将在__aenter__已完成的所有上下文管理器上调用.

这是一个示例实现:

import asyncio
import sys

class gather_cm:
    def __init__(self, *cms):
        self._cms = cms

    async def __aenter__(self):
        futs = [asyncio.create_task(cm.__aenter__())
                for cm in self._cms]
        await asyncio.wait(futs)
        # only exit the cms we've successfully entered
        self._cms = [cm for cm, fut in zip(self._cms, futs)
                     if not fut.cancelled() and not fut.exception()]
        try:
            return tuple(fut.result() for fut in futs)
        except:
            await self._exit(*sys.exc_info())
            raise

    async def _exit(self, *args):
        # don't use gather() to ensure that we wait for all __aexit__s
        # to complete even if one of them raises
        done, _pending = await asyncio.wait(
            [cm.__aexit__(*args)
             for cm in self._cms if cm is not None])
        return all(suppress.result() for suppress in done)

    async def __aexit__(self, *args):
        # Since exits are running in parallel, so they can't see each
        # other exceptions.  Send exception info from `async with`
        # body to all.
        return await self._exit(*args)
Run Code Online (Sandbox Code Playgroud)

该测试程序显示了它的工作原理:

class test_cm:
    def __init__(self, x):
        self.x = x
    async def __aenter__(self):
        print('__aenter__', self.x)
        return self.x
    async def __aexit__(self, *args):
        print('__aexit__', self.x, args)

async def foo():
    async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
        print('cm1', cm1)
        print('cm2', cm2)

asyncio.run(foo())
Run Code Online (Sandbox Code Playgroud)