如何在事件循环之外运行协同程序?

Cha*_*bot 12 python async-await python-asyncio

通常,您通过执行以下操作来获取协同程序的值:

async def coro():
    await asycnio.sleep(3)
    return 'a value'

loop = asyncio.get_event_loop()
value = loop.run_until_complete(coro())
Run Code Online (Sandbox Code Playgroud)

出于好奇,在不使用事件循环的情况下,获得该值的最简单方法是什么?

[编辑]

我认为更简单的方法可以是:

async def coro():
    ...

value = asyncio.run(coro())  # Python 3.7+
Run Code Online (Sandbox Code Playgroud)

但是,有没有可以排序的任何方式yield from(或await)一个coro()全球性像JS?如果没有,为什么?

use*_*342 11

这里有两个问题:一个是关于等待"顶层"的协程,或者更具体地说是在开发环境中.另一个是关于在没有事件循环的情况下运行协程.

关于第一个问题,这在Python中肯定是可能的,就像在Chrome Canary Dev Tools中一样 - 通过自己与事件循环集成处理它的工具.事实上,IPython 7.0及更高版本本身支持asyncio ,您可以await coro()按预期在顶级使用.

关于第二个问题,很容易在没有事件循环的情况下驱动单个协同程序,但它不是很有用.我们来看看为什么.

当调用coroutine函数时,它返回一个协程对象.通过调用其send()方法来启动和恢复此对象.当协同程序决定暂停时(因为它await阻塞了),send()将返回.当协同程序决定返回时(因为它已到达结尾或因为它遇到了显式return),它将引发一个StopIteration异常,并将value属性设置为返回值.考虑到这一点,单个协程的最小驱动程序可能如下所示:

def drive(c):
    while True:
        try:
            c.send(None)
        except StopIteration as e:
            return e.value
Run Code Online (Sandbox Code Playgroud)

这对于简单的协程非常有用:

>>> async def pi():
...     return 3.14
... 
>>> drive(pi())
3.14
Run Code Online (Sandbox Code Playgroud)

或者甚至更复杂的一些:

>>> async def plus(a, b):
...     return a + b
... 
>>> async def pi():
...     val = await plus(3, 0.14)
...     return val
... 
>>> drive(pi())
3.14
Run Code Online (Sandbox Code Playgroud)

但是仍然缺少某些东西 - 上述协同程序都没有暂停执行.当协程挂起时,它允许其他协程运行,这使得事件循环能够(似乎)立即执行许多协同程序.例如,asyncio有一个sleep()协程,当等待时,暂停执行指定的时间段:

async def wait(s):
    await asyncio.sleep(1)
    return s

>>> asyncio.run(wait("hello world"))
'hello world'      # printed after a 1-second pause
Run Code Online (Sandbox Code Playgroud)

但是,drive无法执行此协程以完成:

>>> drive(wait("hello world"))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in drive
  File "<stdin>", line 2, in wait
  File "/usr/lib/python3.7/asyncio/tasks.py", line 564, in sleep
    return await future
RuntimeError: await wasn't used with future
Run Code Online (Sandbox Code Playgroud)

发生的事情是sleep()通过产生一个特殊的"未来"对象与事件循环进行通信.等待未来的协程只能在未来设定后恢复."真实"事件循环将通过运行其他协同程序直到将来完成为止.

要解决这个问题,我们可以编写自己的sleep实现,与我们的迷你事件循环一起使用.为此,我们需要使用迭代器来实现等待:

class my_sleep:
    def __init__(self, d):
        self.d = d
    def __await__(self):
        yield 'sleep', self.d
Run Code Online (Sandbox Code Playgroud)

我们产生一个元组调用者无法看到的元组,但会告诉drive(我们的事件循环)该做什么.drivewait现在这个样子:

def drive(c):
    while True:
        try:
            susp_val = c.send(None)
            if susp_val is not None and susp_val[0] == 'sleep':
                time.sleep(susp_val[1])
        except StopIteration as e:
            return e.value

async def wait(s):
    await my_sleep(1)
    return s
Run Code Online (Sandbox Code Playgroud)

使用此版本,wait按预期工作:

>>> drive(wait("hello world"))
'hello world'
Run Code Online (Sandbox Code Playgroud)

这仍然不是很有用,因为驱动我们的协程的唯一方法是调用drive(),它再次支持单个协程.所以我们不妨编写一个同步函数,只需要调用time.sleep()并调用它一天.对于我们的协同程序来支持异步编程的用例,drive()需要:

  • 支持多个协同程序的运行和暂停
  • 在驱动循环中实现新协同程序的产生
  • 允许协同程序在IO任务上注册唤醒,例如文件描述符变得可读或可写

所有这一切都在David Beazley的精彩演讲中得到了证明,他在现场观众面前实现了一个工作环节.


Cha*_*bot 5

所以经过一番挖掘,我想我找到了最简单的全局执行协程的解决方案。

如果你>>> dir(coro)Python 会打印出以下属性:

['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']
Run Code Online (Sandbox Code Playgroud)

有几个特点很突出,即:

[
   '__await__',
   'close',
   'cr_await',
   'cr_code',
   'cr_frame',
   'cr_origin',
   'cr_running',
   'send',
   'throw'
]
Run Code Online (Sandbox Code Playgroud)

在阅读了 yield (yield) 做什么之后?以及一般发电机如何工作,我认为send方法必须是关键。

所以我试图:

>>> the_actual_coro = coro()
<coroutine object coro at 0x7f5afaf55348> 

>>>the_actual_coro.send(None)
Run Code Online (Sandbox Code Playgroud)

它引发了一个有趣的错误:

Original exception was:
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
StopIteration: a value
Run Code Online (Sandbox Code Playgroud)

它实际上给了我一个异常的返回值!

所以我认为一个非常基本的循环,嗯,它更像是一个跑步者,可以这样实现:

def run(coro):
    try:
        coro.send(None)
    except StopIteration as e:
        return e.value
Run Code Online (Sandbox Code Playgroud)

现在,我可以在同步函数中运行协程,甚至可以在全局范围内运行,但我不建议这样做。但是,了解运行协程的最简单和最低级别是很有趣的

>>> run(coro())
'a value'
Run Code Online (Sandbox Code Playgroud)

然而,Nonecoro有一些东西需要等待时它会返回(这实际上是作为一个协程的本质)。

我认为这可能是因为事件循环coro.cr_frame.f_locals通过将它们分配给期货并单独处理它们来处理它的协程()的等待?我的简单run功能显然没有提供。在这方面我可能是错的。所以如果我错了,请有人纠正我。