Python原生协同程序和send()

Dan*_*ler 15 python coroutine async-await

基于生成器的协同程序具有send()允许调用者和被调用者之间的双向通信并且从调用者恢复生成的生成器协同程序的方法.这是将生成器转换为协同程序的功能.

虽然新的本机async/await协同程序为异步I/O提供了出色的支持,但我看不出如何send()使用它们.明确禁止使用yieldin async函数,因此本机协同程序只能使用return语句返回一次.虽然await表达式为协程带来了新的值,但这些值来自被叫者,而不是来电者,等待的呼叫每次都从头开始评估,而不是从它停止的地方开始.

有没有办法从它停止的地方恢复返回的协同程序,并可能发送一个新值?如何使用本机协同程序模拟David Beazley 关于协同程序和并发好奇课程中的技巧

我想到的一般代码模式就像

def myCoroutine():
  ...
  while True:
    ...
    ping = yield(pong)
    ...
Run Code Online (Sandbox Code Playgroud)

并在呼叫者

while True:
  ...
  buzz = myCoroutineGen.send(bizz)
  ...
Run Code Online (Sandbox Code Playgroud)

编辑

我接受了Kevin的回答,但我注意到PEP

协同程序在内部基于生成器,因此它们共享实现.与生成器对象类似,协同程序具有throw(),send()和close()方法.

...

用于协同程序的throw(),send()方法用于推送值并将错误引发到类似Future的对象中.

显然,原生协同程序确实有一个send()?如果没有yield表达式来接收协程内的值,它如何工作?

pla*_*mut 13

在完成了 Beazley 关于协程的相同(很棒,我必须说)课程后,我问自己同样的问题 - 如何调整代码以使用 Python 3.5 中引入的本机协程?

事实证明,它可以通过对代码进行相对较小的更改来完成。我将假设读者熟悉课程材料,并将以pyos4.py版本为基础——第一个Scheduler支持“系统调用”的版本。

提示:可以在最后的附录 A中找到完整的可运行示例。

客观的

目标是转如下协程代码:

def foo():
    mytid = yield GetTid()  # a "system call"
    for i in xrange(3):
        print "I'm foo", mytid
        yield  # a "trap"
Run Code Online (Sandbox Code Playgroud)

...进入本机协程并仍然像以前一样使用:

async def foo():
    mytid = await GetTid()  # a "system call"
    for i in range(3):
        print("I'm foo", mytid)
        await ???  # a "trap" (will explain the missing bit later)
Run Code Online (Sandbox Code Playgroud)

我们想在没有 的情况下运行它asyncio,因为我们已经有了自己的事件循环来驱动整个过程——它是Scheduler类。

等待对象

本机协程不能立即工作,以下代码会导致错误:

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

sched = Scheduler()
sched.new(foo())
sched.mainloop()
Run Code Online (Sandbox Code Playgroud)
回溯(最近一次调用最后一次):
    ...
    mytid = 等待 GetTid()
类型错误:对象 GetTid 不能在“await”表达式中使用

PEP 492解释了可以等待的对象类型。选项之一是“具有__await__返回迭代器的方法的对象”

就像yield from,如果您熟悉它,它await充当等待对象和驱动协程的最外层代码(通常是事件循环)之间的隧道。这最好用一个例子来证明:

Traceback (most recent call last):
    ...
    mytid = await GetTid()
TypeError: object GetTid can't be used in 'await' expression

foo()交互方式驱动协程会产生以下结果:

class Awaitable:
    def __await__(self):
        value = yield 1
        print("Awaitable received:", value)
        value = yield 2
        print("Awaitable received:", value)
        value = yield 3
        print("Awaitable received:", value)
        return 42


async def foo():
    print("foo start")
    result = await Awaitable()
    print("foo received result:", result)
    print("foo end")
Run Code Online (Sandbox Code Playgroud)

任何被发送到的东西都会f_coro被引导到Awaitable实例中。类似地,无论Awaitable.__await__()产生什么,都会冒泡到发送值的最顶层代码。

整个过程对f_coro协程是透明的,不直接参与,也看不到值的上下传递。然而,当Awaitable的迭代器耗尽时,它的返回值成为await表达式的结果(在我们的例子中f_coro是42),这就是最终恢复的地方。

请注意,await协程中的表达式也可以链接。一个协程可以等待另一个等待另一个协程的协程......直到整个链以yield某个地方结束。

将值发送到协程本身

这些知识对我们有什么帮助?好吧,在课程材料中,协程可以产生一个SystemCall实例。调度器理解这些并让系统调用处理请求的操作。

为了让协程将 aSystemCall带到调度程序,一个SystemCall实例可以简单地yield 自身,并将如上一节所述将其引导至调度程序。

因此,第一个需要的更改是将此逻辑添加到基SystemCall类:

>>> f_coro = foo()  # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
Run Code Online (Sandbox Code Playgroud)

SystemCall实例设置为可等待后,现在实际运行以下内容:

class SystemCall:
    ...
    def __await__(self):
        yield self
Run Code Online (Sandbox Code Playgroud)

输出:

我是 foo 没有
任务 1 终止

太好了,它不再崩溃了!

但是,协程没有收到任务ID,None而是得到了。这是因为由系统调用的handle()方法设置并由该方法发送的值Task.run()

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()
Run Code Online (Sandbox Code Playgroud)

...结束了SystemCall.__await__()方法。如果我们要将值带入协程,系统调用必须返回它,这样它就成为await协程中表达式的值。

I'm foo None
Task 1 terminated

使用修改后的代码运行相同的代码SystemCall会产生所需的输出:

我是 foo 1
任务 1 终止

同时运行协程

我们仍然需要一种方法来挂起协程,即拥有一个系统“陷阱”代码。在课程材料中,这是yield在协程中使用普通代码完成的,但尝试使用普通代码await实际上是一个语法错误:

# in Task.run()
self.target.send(self.sendval)
Run Code Online (Sandbox Code Playgroud)

幸运的是,解决方法很简单。由于我们已经有工作系统调用,我们可以添加一个虚拟的 no-op 系统调用,它的唯一工作是暂停协程并立即重新安排它:

class SystemCall:
    ...
    def __await__(self):
        return (yield self)
Run Code Online (Sandbox Code Playgroud)

sendval在任务上设置 a是可选的,因为该系统调用不会产生任何有意义的值,但我们选择使其明确。

我们现在拥有运行多任务操作系统所需的一切!

I'm foo 1
Task 1 terminated

输出:

我是 foo 1
我是酒吧 2
我是 foo 1
我是酒吧 2
我是 foo 1
我是酒吧 2
任务 1 终止
我是酒吧 2
我是酒吧 2
任务 2 终止

脚注

Scheduler代码是完全不变。

它。只是。作品。

这显示了原始设计的美妙之处,其中调度程序和在其中运行的任务没有相互耦合,我们能够在不Scheduler知道的情况下更改协程实现。即使Task是包装协程的类也不必更改。

不需要蹦床。

pyos8.py版本的系统中,实现了蹦床的概念。它允许协程在 shceduler 的帮助下将其部分工作委托给另一个协程(调度程序代表父协程调用子协程并将前者的结果发送到父协程)。

这种机制是不需要的,因为await(和它的老伙伴,yield from)已经使这种链接成为可能,正如开头所解释的那样。

附录 A - 一个完整的可运行示例(需要 Python 3.5+)

示例_full.py
async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await  # SyntaxError here
Run Code Online (Sandbox Code Playgroud)

  • 这个答案实际上回答了问题,应该有更多的要点 (2认同)

Kev*_*vin 10

有没有办法从它停止的地方恢复返回的协同程序,并可能发送一个新值?

没有.

async并且await句法糖yield from.当一个协程返回(带有return语句)时,就是这样.框架消失了.它不可恢复.这正是发电机一直有效的方式.例如:

def foo():
    return (yield)
Run Code Online (Sandbox Code Playgroud)

你可以做f = foo(); next(f); f.send(5),然后你会回来5.但是如果你再试f.send()一次,它就行不通,因为你已经从框架中返回了. f不再是现场发电机.

现在,至于新的协同程序,据我所知,似乎屈服和发送被保留用于事件循环和某些基本谓词之间的通信,例如asyncio.sleep().协同程序产生asyncio.Future对象直到事件循环,并且一旦相关操作完成(call_soon()事件通常通过其他事件循环方法调度),事件循环将这些相同的未来对象发送回协同程序.

你可以通过等待它们来产生未来的对象,但它不像通用的界面那样.send().它专门用于事件循环实现.如果你没有实现一个事件循环,你可能不想玩这个.如果您正在实施一个事件循环,您需要问问自己为什么完美的实现asyncio不足以满足您的目的,并在我们可以帮助您之前解释您要做的具体操作.

请注意,yield from不推荐使用.如果你想要根本没有绑定到事件循环的协同程序,只需使用它. asyncawait专门为事件循环异步编程设计.如果这不是你在做什么,然后asyncawait都开始与错误的工具.

还有一件事:

采用yield在异步函数被明确禁止的,所以本地协同程序可以使用返回只有一次return发言.

await表达确实产生控制. await something()完全类似于yield from something().他们只是更改了名称,以便对不熟悉发电机的人更直观.


对于那些真正有兴趣实现自己的事件循环的人,这里有一些示例代码显示(非常小的)实现.这个事件循环被极度剥离,因为它被设计为同步运行某些特殊编写的协同程序,就像它们是正常的函数一样.它不提供您对实际BaseEventLoop实现所期望的全方位支持,并且不适合与任意协同程序一起使用.

通常情况下,我会将代码包含在我的答案中,而不是链接到它,但是存在版权问题并且对答案本身并不重要.

  • 是的,但是 _coroutine_ 这个术语在计算机科学中的含义可以追溯到 20 世纪 60 年代。我试图解决的问题是弄清楚如何使用 async/await 来执行实际的协程。现在我知道我不应该这样做。 (5认同)

de_*_*ris 8

我知道这是一个非常古老的线程。同时,有 PEP525 ( https://peps.python.org/pep-0525/ ) 和异步生成器的实现可用。我有同样的问题,很难找到答案。因此我的贡献:

import asyncio
import time


async def gen():
    try:
        while True:
            await asyncio.sleep(0.1)
            value = yield 'hello'
            print("got:", value) 
    except ZeroDivisionError:
        await asyncio.sleep(0.2)
        yield 'world'


async def main():
    now = time.time()
    g = gen()
    v = await g.asend(None)
    v = await g.asend('something')
    b = await g.asend(None)
    c = await g.asend(None)
    print(v,b,c, time.time() - now) 

    v = await g.athrow(ZeroDivisionError)
    print(v)


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

  • 这回答了 OP 的 __updated__ 问题:“那么显然本机协程确实有 `send()`?*并且*这是非常干净的代码,感谢您强调这个记录不足的功能! (2认同)