如何在 Python 3 中嵌套异步上下文管理器

Kev*_*lan 5 python-asyncio

假设我们有两个异步上下文管理器,它们通常以嵌套方式一起使用,但通常只有第二个的结果在主体中使用。例如,如果我们发现自己经常输入以下内容:

async with context_mgr_1() as cm1:
    async with cm2.context_mgr_2() as cm2:
        ...do something with cm2...
Run Code Online (Sandbox Code Playgroud)

我们如何创建一个嵌套这些上下文管理器的单个上下文管理器,以便我们可以执行以下操作:

async with context_mgr_2() as cm2:
    ...do something with cm2...
Run Code Online (Sandbox Code Playgroud)

contextlib.nested 用于为非异步上下文管理器完成此操作,但我在 asyncio 中没有找到这样的帮助器。

Mes*_*ssa 6

从 Python 3.7 开始,有contextlib.AsyncExitStack

async with AsyncExitStack() as stack:
    cm1 = await stack.enter_async_context(context_mgr_1())
    cm2 = await stack.enter_async_context(context_mgr_2())
    # ...do something with cm2...
Run Code Online (Sandbox Code Playgroud)

它可以动态使用:

async with AsyncExitStack() as stack:
    connections = [await stack.enter_async_context(get_connection())
        for i in range(5)]
    # All opened connections will automatically be released at the end of
    # the async with statement, even if attempts to open a connection
    # later in the list raise an exception.
Run Code Online (Sandbox Code Playgroud)

它继承了contextlib.ExitStack,因此您可以组合异步和非异步上下文管理器:

async with AsyncExitStack() as stack:
    cm1 = await stack.enter_async_context(context_mgr_1())
    f = stack.enter_context(open('file.txt'))
    cm2 = await stack.enter_async_context(context_mgr_2())
    # ...
Run Code Online (Sandbox Code Playgroud)


Kev*_*lan 0

在内部,我开始使用它来将同步和异步上下文管理器包装为异步管理器。它允许 AsyncExitStack 风格的推送语义以及简单地包装多个管理器。

它经过了相当好的测试,但我没有发布测试或计划支持这一点,所以使用时需要您自担风险......

import asyncio
import logging
import sys

from functools import wraps

class AsyncContextManagerChain(object):

    def __init__(self, *managers):
        self.managers = managers
        self.stack = []
        self.values = []

    async def push(self, manager):
        try:
            if hasattr(manager, '__aenter__'):
                value = await manager.__aenter__()
            else:
                value = manager.__enter__()

            self.stack.append(manager)
            self.values.append(value)
            return value
        except:
            # if we encounter an exception somewhere along our enters,
            # we'll stop adding to the stack, and pop everything we've
            # added so far, to simulate what would happen when an inner
            # block raised an exception.
            swallow = await self.__aexit__(*sys.exc_info())
            if not swallow:
                raise

    async def __aenter__(self):
        value = None

        for manager in self.managers:
            value = await self.push(manager)

        return value

    async def __aexit__(self, exc_type, exc, tb):
        excChanged = False
        swallow = False # default value
        while self.stack:
            # no matter what the outcome, we want to attempt to call __aexit__ on
            # all context managers
            try:
                swallow = await self._pop(exc_type, exc, tb)
                if swallow:
                    # if we swallow an exception on an inner cm, outer cms would
                    # not receive it at all...
                    exc_type = None
                    exc = None
                    tb = None
            except:
                # if we encounter an exception while exiting, that is the
                # new execption we send upward
                excChanged = True
                (exc_type, exc, tb) = sys.exc_info()
                swallow = False

        if exc is None:
            # when we make it to the end, if exc is None, it was swallowed
            # somewhere along the line, and we've exited everything successfully,
            # so tell python to swallow the exception for real
            return True
        elif excChanged:
            # if the exception has been changed, we need to raise it here
            # because otherwise python will just raise the original exception
            if not swallow:
                raise exc
        else:
            # we have the original exception still, we just let python handle it...
            return swallow

    async def _pop(self, exc_type, exc, tb):
    manager = self.stack.pop()
    if hasattr(manager, '__aexit__'):
        return await manager.__aexit__(exc_type, exc, tb)
    else:
        return manager.__exit__(exc_type, exc, tb)
Run Code Online (Sandbox Code Playgroud)