在长时间运行的代码中使用 asyncio.sleep() 将异步函数划分为多个较小的代码部分是否好?

zen*_*625 5 python asynchronous async-await python-asyncio

如果我有一些函数,它做了很多计算,并且可能需要一段时间,那么asyncio.sleep()在计算的各个部分之间使用释放事件循环(以防止阻塞事件循环)是否好?

import asyncio


async def long_function(a, b, c):
    # some calculations
    await asyncio.sleep(0)  # release event loop
    # some another calculations
    await asyncio.sleep(0)  # release event loop
Run Code Online (Sandbox Code Playgroud)

是否有另一种更好的方法来解决此类问题?一些最佳实践,也许?

Par*_*erD 8

TL;DR 只是loop.run_in_executor用来做阻塞工作


要了解为什么它没有帮助,让我们首先创建一个class对事件循环执行某些操作的对象。喜欢:

class CounterTask(object):
    def __init__(self):
        self.total = 0
    
    async def count(self):
        while True:
            try:
                self.total += 1
                await asyncio.sleep(0.001)  # Count ~1000 times a second
            except asyncio.CancelledError:
                return
Run Code Online (Sandbox Code Playgroud)

如果事件循环对它完全开放,这将简单地每秒计算大约 1000 次。

幼稚的

为了演示最糟糕的方式,让我们启动计数器任务并天真地运行一个昂贵的函数而不考虑后果:

async def long_function1():
    time.sleep(0.2)  # some calculations


async def no_awaiting():
    counter = CounterTask()
    task = asyncio.create_task(counter.count())
    await long_function1()
    task.cancel()
    print("Counted to:", counter.total)


asyncio.run(no_awaiting())
Run Code Online (Sandbox Code Playgroud)

输出:

async def long_function1():
    time.sleep(0.2)  # some calculations


async def no_awaiting():
    counter = CounterTask()
    task = asyncio.create_task(counter.count())
    await long_function1()
    task.cancel()
    print("Counted to:", counter.total)


asyncio.run(no_awaiting())
Run Code Online (Sandbox Code Playgroud)

嗯,这没有任何计数!请注意,我们根本没有等待。这个函数只是做同步阻塞工作。如果计数器能够自己在事件循环中运行,我们应该在那个时候计数到大约 200。嗯,所以也许如果我们将它拆分并利用asyncio将控制权交还给事件循环,它可以算数吗?让我们试试那个...

拆分它

async def long_function2():
    time.sleep(0.1)  # some calculations
    await asyncio.sleep(0)  # release event loop
    time.sleep(0.1)  # some another calculations
    await asyncio.sleep(0)  # release event loop


async def with_awaiting():
    counter = CounterTask()
    task = asyncio.create_task(counter.count())
    await long_function2()
    task.cancel()
    print("Counted to:", counter.total)


asyncio.run(with_awaiting())
Run Code Online (Sandbox Code Playgroud)

输出:

Counted to: 0
Run Code Online (Sandbox Code Playgroud)

好吧,我想这在技术上更好。但最终这表明了这一点:asyncio事件循环不应该进行任何阻塞处理。它不是为了解决这些问题。事件循环无助地等待您的下一个await. 但是run_in_executor确实为此提供了解决方案,同时保持我们的代码asyncio风格。

执行者

def long_function3():
    time.sleep(0.2)  # some calculations


async def in_executor():
    counter = CounterTask()
    task = asyncio.create_task(counter.count())
    await asyncio.get_running_loop().run_in_executor(None, long_function3)
    task.cancel()
    print("Counted to:", counter.total)


asyncio.run(in_executor())
Run Code Online (Sandbox Code Playgroud)

输出:

async def long_function2():
    time.sleep(0.1)  # some calculations
    await asyncio.sleep(0)  # release event loop
    time.sleep(0.1)  # some another calculations
    await asyncio.sleep(0)  # release event loop


async def with_awaiting():
    counter = CounterTask()
    task = asyncio.create_task(counter.count())
    await long_function2()
    task.cancel()
    print("Counted to:", counter.total)


asyncio.run(with_awaiting())
Run Code Online (Sandbox Code Playgroud)

好多了!我们的循环能够继续运行,而我们的阻塞函数也在做事情,这是通过很好的老式线程方式。