为什么向 Web 套接字发送消息不会产生对事件循环的控制?

OES*_*OES 2 event-loop websocket python-internals python-asyncio

考虑以下代码:

主要.py

import asyncio
import websockets


async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)
        print(message)


async def main():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == '__main__':
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

其他.py

import asyncio
import json

import websockets

tasks = set()


async def run_job(i):
    await asyncio.sleep(0.)
    print(f"I'm job number {i}")


async def bunch_of_tasks(ws):
    for i in range(10):
        task = asyncio.create_task(run_job(i), name=f'job-{i}')
        tasks.add(task)
        task.add_done_callback(tasks.discard)
        print(f'had a nice sleep! now my value is {i}')
        # await asyncio.sleep(0.)
        await ws.send(json.dumps('hello there!'))
    await asyncio.gather(*tasks)
    print(f'tasks done')


async def do_stuff():
    async with websockets.connect("ws://localhost:8765") as websocket:
        await bunch_of_tasks(websocket)
        await websocket.recv()

if __name__ == '__main__':
    asyncio.run(do_stuff())
Run Code Online (Sandbox Code Playgroud)

通过首先运行main.py然后并行运行other.py,我得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
had a nice sleep! now my value is 2
had a nice sleep! now my value is 3
had a nice sleep! now my value is 4
had a nice sleep! now my value is 5
had a nice sleep! now my value is 6
had a nice sleep! now my value is 7
had a nice sleep! now my value is 8
had a nice sleep! now my value is 9
I'm job number 0
I'm job number 1
I'm job number 2
I'm job number 3
I'm job number 4
I'm job number 5
I'm job number 6
I'm job number 7
I'm job number 8
I'm job number 9
tasks done
Run Code Online (Sandbox Code Playgroud)

但如果我await asyncio.sleep(0.)之前取消注释await ws.send(json.dumps('hello there!')),我会得到:

had a nice sleep! now my value is 0
had a nice sleep! now my value is 1
I'm job number 0
had a nice sleep! now my value is 2
I'm job number 1
had a nice sleep! now my value is 3
I'm job number 2
had a nice sleep! now my value is 4
I'm job number 3
had a nice sleep! now my value is 5
I'm job number 4
had a nice sleep! now my value is 6
I'm job number 5
had a nice sleep! now my value is 7
I'm job number 6
had a nice sleep! now my value is 8
I'm job number 7
had a nice sleep! now my value is 9
I'm job number 8
I'm job number 9
tasks done
Run Code Online (Sandbox Code Playgroud)

这正是我所期望的。

因此,显然将消息发送到 Web 套接字并不会产生对事件循环的控制,并且run_job协程没有机会运行。然而,有效地暂停当前任务并提供执行的asyncio.sleep机会。run_job

为什么会发生这种情况?

jsb*_*eno 8

TL;DR:确实 -await本身不足以保证控制权传递回 asyncio 循环的协作部分。为了确保这一点,必须在调用链的某个位置安排一个低级回调(并asyncio.sleep执行此操作)。

长答案:

我必须调试它 - 碰巧的是,尽管客户端异步的一切都已就位websockets,但它归结为立即写入选择器套接字中发送的所有数据 - 因为否则它是不受约束的。

换句话说, ws.send最终将同步调用此行:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Lib/asyncio/selector_events.py#L1071

然后,最大的惊喜是,对于原始协程(未包装在任务或 future 中的协程),每当它们返回其值时,执行都不会屈服于异步循环 - 即使它们包含其他“等待”语句中,如果嵌套等待的协同例程实际上不会在等待中“阻塞”,则异步循环永远不会返回。在内部,在处理协同例程的 C 代码中,每当协同例程在发送后实际“阻塞”时,就会调度一个回调:此回调被包装在一个 Pythonasyncio.events.Handle对象中,然后控制权返回到 asyncio 循环。

其代码位于此 C 函数中:https://github.com/python/cpython/blob/f2e5a6ee628502d307a97f587788d7022a200229/Modules/_asynciomodule.c#L2696。如果您按照此处进行操作,您可以看到,如果协同例程返回一个值,则其结果将设置在低级对象中。如果它从send另一个等待中返回,则该等待被安排在循环中,并且该函数返回。

它可能是(也可能是)异步循环的实现选择:出于性能原因,任何实际同步解析的所谓协同例程都是同步运行的。

仅当在await 中调用的任何嵌套协同例程最终在叶调用中用回调安排未来时(确实如此asyncio.sleep),asyncio 默认循环才会运行所有其他就绪任务。._run_once(其方法的另一个完整执行asyncio.base_events.BaseEventLoop)。

  • 这是一个很好的答案!非常感谢@jsbueno 的深入研究和精彩的解释! (4认同)