如何在异步协程中包装同步函数?

Zac*_*hal 30 python asynchronous python-3.x python-asyncio aiohttp

我正在使用aiohttp构建一个API服务器,将TCP请求发送到单独的服务器.发送TCP请求的模块是同步的,并且是出于我的目的的黑盒子.所以我的问题是这些请求阻止了整个API.我需要一种方法将模块请求包装在异步协程中,该协程不会阻止API的其余部分.

所以,仅仅使用sleep一个简单的例子,有没有办法以某种方式将耗时的同步代码包装在非阻塞协程中,如下所示:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'
Run Code Online (Sandbox Code Playgroud)

Zac*_*hal 28

最终我在这个帖子中找到了答案.我正在寻找的方法是run_in_executor.这允许同步函数异步运行而不会阻塞事件循环.

sleep上面发布的示例中,它可能如下所示:

import asyncio
from time import sleep
from concurrent.futures import ProcessPoolExecutor

async def sleep_async(loop, delay):
    # Can set executor to None if a default has been set for loop
    await loop.run_in_executor(ProcessPoolExecutor(), sleep, delay)
    return 'I slept asynchronously'
Run Code Online (Sandbox Code Playgroud)

另请参阅以下答案 - > 我们如何调用正常函数,其中有一个协程?

  • `ProcessPoolExecutor`成本很高,因为它启动了一个全新的python解释器.当您拥有需要使用多个处理器的CPU密集型任务时,可以使用它.考虑使用`ThreadPoolExecutor`,它使用线程. (12认同)
  • 只是注意,而不是创建一个新的执行器,通过调用`loop.run_in_executor(executor = None,func,*args)`来使用默认执行器可能更简单(参见[documentation](https://docs.python) .ORG/3 /库/ ASYNCIO-eventloop.html#asyncio.AbstractEventLoop.run_in_executor)). (8认同)
  • 感谢您提供更多信息.虽然最初的例子使用了进程池,但是经过一些研究后,我最终还是使用了"ThreadPoolExecutor".仍然看起来有点怪,但到目前为止,它们都在一起. (5认同)
  • 要获取事件循环,可以执行“loop = asyncio.get_event_loop()” (4认同)

Ale*_*mov 25

从 python 3.9 开始,最简洁的方法是使用asyncio.to_thread方法,该方法基本上是 的快捷方式run_in_executor,但保留所有上下文变量。

\n

另外,请考虑 GIL,因为它是一个 to_线程。您仍然可以运行 CPU 密集型任务,例如numpy. 来自文档:

\n
Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don\xe2\x80\x99t have one, asyncio.to_thread() can also be used for CPU-bound functions.\n
Run Code Online (Sandbox Code Playgroud)\n

同步功能的用法示例:

\n
def blocking_io():\n    time.sleep(1)\n\nasync def main():\n    asyncio.to_thread(blocking_io)\n\n\nasyncio.run(main())\n
Run Code Online (Sandbox Code Playgroud)\n

  • 没想到一个没有例子的简单评论会对我有如此大的帮助,谢谢! (2认同)

osp*_*der 12

您可以使用装饰器将同步版本包装为异步版本.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'
Run Code Online (Sandbox Code Playgroud)

或者使用aioifylib

% pip install aioify
Run Code Online (Sandbox Code Playgroud)

然后

@aioify
def sleep_async(delay):
    pass
Run Code Online (Sandbox Code Playgroud)


Evg*_*nov 8

也许有人需要我的解决方案来解决这个问题。我编写了自己的库来解决这个问题,它允许您使用装饰器使任何函数异步。

要安装该库,请运行以下命令:

$ pip install awaits
Run Code Online (Sandbox Code Playgroud)

要使任何函数异步,只需向其添加 @awaitable 装饰器,如下所示:

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b
Run Code Online (Sandbox Code Playgroud)

现在您可以确保您的函数确实是异步协程:

print(asyncio.run(sum(2, 2)))
Run Code Online (Sandbox Code Playgroud)

“在幕后”您的函数将在线程池中执行。每次调用函数时都不会重新创建该线程池。线程池创建一次并通过队列接受新任务。这将使您的程序比使用其他解决方案运行得更快,因为创建额外的线程是额外的开销。


Sab*_*rov 8

装饰器对于这种情况很有用,并在另一个线程中运行阻塞函数。

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))

Run Code Online (Sandbox Code Playgroud)

  • Union[None, ...] - None 和某些内容的并集是可选的[...] (2认同)
  • 不是可选[None,ThreadPoolExecutor],只是可选[ThreadPoolExecutor] =) (2认同)