如何监视Python事件循环的繁忙程度?

Nic*_*k T 7 python python-asyncio

我有一个异步应用程序,它通过aiohttp提供请求并执行其他异步任务(与数据库交互,处理消息,将请求本身作为HTTP客户端).我想监视事件循环的繁忙程度,也就是说它花费了多少时间来执行代码而不是等待select完成.

有没有办法用标准库事件循环或其他第三方循环(uvloop)来衡量这个?

具体来说,我想要一个持续的百分比衡量饱和度,而不仅仅是这个问题似乎要解决的二元"忙碌" .

Mik*_*mov 6

挖掘源代码显示如下:

  1. 事件循环基本上是在循环中执行 _run_oncewhile True
  2. _run_once做所有的事情,包括等待select完成
  3. timeout等待select 是不是保存在某个地方外_run_once

没有什么能阻止我们重新实现_run_once所需的时间。

相反,应对充满_run_once实施,我们可以用鼠标右键之前假定的时间select_run_once开始(因为上面select没有任何耗时的发生),并在之后的时间select 是何时 _process_events开始。

从言语到行动:

import asyncio


class MeasuredEventLoop(asyncio.SelectorEventLoop):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._total_time = 0
        self._select_time = 0

        self._before_select = None

    # TOTAL TIME:
    def run_forever(self):
        started = self.time()
        try:
            super().run_forever()
        finally:
            finished = self.time()
            self._total_time = finished - started

    # SELECT TIME:
    def _run_once(self):
        self._before_select = self.time()
        super()._run_once()

    def _process_events(self, *args, **kwargs):
        after_select = self.time()
        self._select_time += after_select - self._before_select
        super()._process_events(*args, **kwargs)

    # REPORT:
    def close(self, *args, **kwargs):
        super().close(*args, **kwargs)

        select = self._select_time
        cpu = self._total_time - self._select_time
        total = self._total_time

        print(f'Waited for select: {select:.{3}f}')
        print(f'Did other stuff: {cpu:.{3}f}')
        print(f'Total time: {total:.{3}f}')
Run Code Online (Sandbox Code Playgroud)

让我们测试一下:

import time


async def main():
    await asyncio.sleep(1)  # simulate I/O, will be handled by selectors
    time.sleep(0.01)        # CPU job, executed here, outside event loop
    await asyncio.sleep(1)
    time.sleep(0.01)


loop = MeasuredEventLoop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()
Run Code Online (Sandbox Code Playgroud)

结果:

Waited for select: 2.000
Did other stuff: 0.032
Total time: 2.032
Run Code Online (Sandbox Code Playgroud)

让我们使用真实的I / O对它进行测试:

import aiohttp


async def io_operation(delay):
    async with aiohttp.ClientSession() as session:
        async with session.get(f'http://httpbin.org/delay/{delay}') as resp:
            await resp.text()


async def main():
    await asyncio.gather(*[
        io_operation(delay=1),
        io_operation(delay=2),
        io_operation(delay=3),
    ])
Run Code Online (Sandbox Code Playgroud)

结果:

Waited for select: 3.250
Did other stuff: 0.016
Total time: 3.266
Run Code Online (Sandbox Code Playgroud)