Python协程:暂停时释放上下文管理器

Ram*_*hum 8 python coroutine async-await

背景:我是一位经验丰富的Python程序员,对新的协程/异步/等待功能一无所知。我无法编写一个异步的“ hello world”来挽救生命。

我的问题是:给我一个任意协程函数f。我想编写一个g将包装的协程函数f,即我将给g用户,就像它是一样f,并且用户将调用它,而没有人会更明智,因为g将在后台使用f。就像装饰普通的Python函数以添加功能时一样。

我要添加的功能:每当程序流进入协程时,它都会获取我提供的上下文管理器,一旦程序流离开协程,它将释放该上下文管理器。流量又回来了吗?重新获取上下文管理器。它退出了吗?重新释放它。直到协程完全完成。

为了演示,这是使用普通生成器描述的功能:

def generator_wrapper(_, *args, **kwargs):
    gen = function(*args, **kwargs)
    method, incoming = gen.send, None
    while True:
        with self:
            outgoing = method(incoming)
        try:
            method, incoming = gen.send, (yield outgoing)
        except Exception as e:
            method, incoming = gen.throw, e
Run Code Online (Sandbox Code Playgroud)

有可能用协程吗?

Mis*_*agi 7

协程建立在迭代器上——__await__特殊的方法是一个普通的迭代器。这允许您将底层迭代器包装在另一个迭代器中。诀窍是您必须使用 its解开目标的迭代器__await__,然后使用您自己的重新包装您自己的迭代器__await__

适用于实例化协程的核心功能如下所示:

class CoroWrapper:
    """Wrap ``target`` to have every send issued in a ``context``"""
    def __init__(self, target: 'Coroutine', context: 'ContextManager'):
        self.target = target
        self.context = context

    # wrap an iterator for use with 'await'
    def __await__(self):
        # unwrap the underlying iterator
        target_iter = self.target.__await__()
        # emulate 'yield from'
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        while True:
            # communicate with the target coroutine
            try:
                with self.context:
                    signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            # communicate with the ambient event loop
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err
Run Code Online (Sandbox Code Playgroud)

请注意,这明确适用于 a Coroutine,而不是Awaitable-Coroutine.__await__实现了生成器接口。理论上, anAwaitable不一定提供__await__().send__await__().throw

这足以传入和传出消息:

import asyncio


class PrintContext:
    def __enter__(self):
        print('enter')

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('exit via', exc_type)
        return False


async def main_coro():
    print(
        'wrapper returned',
        await CoroWrapper(test_coro(), PrintContext())
    )


async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

asyncio.run(main_coro())
# enter
# exit via None
# enter
# exit <class 'StopIteration'>
# wrapper returned 2
Run Code Online (Sandbox Code Playgroud)

您可以将包装部分委托给单独的装饰器。这也确保你有一个实际的协程,而不是一个自定义类——一些异步库需要这个。

from functools import wraps


def send_context(context: 'ContextManager'):
    """Wrap a coroutine to issue every send in a context"""
    def coro_wrapper(target: 'Callable[..., Coroutine]') -> 'Callable[..., Coroutine]':
        @wraps(target)
        async def context_coroutine(*args, **kwargs):
            return await CoroWrapper(target(*args, **kwargs), context)
        return context_coroutine
    return coro_wrapper
Run Code Online (Sandbox Code Playgroud)

这允许您直接装饰协程函数:

@send_context(PrintContext())
async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

print('async run returned:', asyncio.run(test_coro()))
# enter
# exit via None
# enter
# exit via <class 'StopIteration'>
# async run returned: 2
Run Code Online (Sandbox Code Playgroud)

  • 同意关闭“target_iter”和“StopIteration”。至于期望协程的事件循环,我认为对于 asyncio 来说并非如此。“Future”是一个可等待的示例,它不是协程,并且在任何地方都被接受。唯一需要协程的地方是“create_task”,它是专门为驱动协程而设计的。 (2认同)