如何在 Twisted 的 asyncioreactor 之上运行 asyncio 库代码?

blz*_*blz 4 interop twisted python-asyncio python-3.6

我已经成功导入/安装 Twistedasyncioreactor并执行一个简单的异步函数:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

async def sleepy(reactor):
    print("SLEEPING")
    await task.deferLater(reactor, 3.0, lambda: None)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d
Run Code Online (Sandbox Code Playgroud)

例如,我想在所述代码中混合异步库asyncio.sleep。我尝试过以下方法:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

import asyncio

async def sleepy(reactor):
    print("SLEEPING")
    await asyncio.sleep(3)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d
Run Code Online (Sandbox Code Playgroud)

这会产生以下错误:

 $ python test.py
SLEEPING
main function encountered error
Traceback (most recent call last):
  File "test.py", line 16, in <module>
    @task.react
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/task.py", line 908, in react
    finished = main(_reactor, *argv)
  File "test.py", line 18, in main
    d = ensureDeferred(sleepy(reactor))
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 823, in ensureDeferred
    return _inlineCallbacks(None, coro, Deferred())
--- <exception caught here> ---
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 1301, in _inlineCallbacks
    result = g.send(result)
  File "test.py", line 11, in sleepy
    await asyncio.sleep(3)
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/asyncio/tasks.py", line 476, in sleep
    return (yield from future)
builtins.AssertionError: yield from wasn't used with future
Run Code Online (Sandbox Code Playgroud)

我想,这很公平,所以我尝试await asyncio.sleep(3)await ensureDeferred(asyncio.sleep(3))和交换await asyncio.ensure_future(asyncio.sleep(3)),但我得到了完全相同的错误。

如何安排 aio 协程(和/或Future)在 所使用的同一事件循环上运行asyncioreactor

Iva*_*van 6

所以我尝试将await asyncio.sleep(3) 与await EnsureDeferred(asyncio.sleep(3)) 和await asyncio.ensure_future(asyncio.sleep(3)) 交换

你就快到了,你应该将两者结合起来,并使用Deferred.fromFuture而不是ensureDeferred

await Deferred.fromFuture(asyncio.ensure_future(asyncio.sleep(3)))
Run Code Online (Sandbox Code Playgroud)

规则是,async def在 Twisted 上下文(with ensureDeferred)中运行的函数只能在 asyncio 上运行Deferred,而async def在 asyncio 上下文(with )中运行的函数ensure_future只能在 asyncio 上Future运行(人们总是可以在其他协程对象上等待(async def函数调用的结果),但链最终将导致Deferred/Future)。要从 asyncio 转换FutureDeferreduseDeferred.fromFuture以及转换为 asyncio Futureuse Deferred.asFuture

人们可以将上下文从一种上下文切换到另一种上下文,然后再切换回来。在这个(人为的)示例中,我们从sleepy_twisted在 Twisted 上下文中运行开始,该上下文执行 Twisted 睡眠,然后切换到异步上下文来运行,sleepy_asyncio该上下文执行异步睡眠,然后再次切换到 Twisted 上下文以进行 Twisted 睡眠:

from twisted.internet import asyncioreactor, task
from twisted.internet.defer import inlineCallbacks, ensureDeferred, Deferred
import asyncio
asyncioreactor.install()

async def sleepy_asyncio(reactor):
   print("Sleep 2")
   await asyncio.sleep(1)
   print("Sleep 3")
   await Deferred.asFuture(task.deferLater(reactor, 1, lambda: None), loop=asyncio.get_running_loop())
   
async def sleepy_twisted(reactor):
   print("Sleep 1")
   await task.deferLater(reactor, 1, lambda: None)
   await Deferred.fromFuture(asyncio.ensure_future(sleepy_asyncio(reactor)))
   print("done")

@task.react
def main(reactor):
    return ensureDeferred(sleepy_twisted(reactor))
Run Code Online (Sandbox Code Playgroud)