如何以安全的方式从异步函数调用同步函数

Alb*_*o M 2 python synchronization function call python-asyncio

如果一个或多个工作人员同时调用“同步功能”会发生什么?也许一名或多名工人会被封锁一段时间?

async def worker(queue):
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        # Create path + file_name 
        destination_path = create_path(file_name) #<-- SYNC function

        await download_medical(queue_out,destination_path)

async def main():
    queue_in = asyncio.Queue(1)
    workers = [asyncio.create_task(worker(queue_in)) for _ in range(5)]
        
    async for result in get_result(building):
        await queue_in.put(result) 

def create_path(file_name):
    #....#
    #operations related to file and folder on the hdd
    #creates a folder based on file name
Run Code Online (Sandbox Code Playgroud)

san*_*eek 12

简短回答:

  • 如果您从异步协程中调用同步(阻塞)函数,则循环中同时运行的所有任务将停止,直到该函数返回。
  • 用于loop.run_in_executor(...)在另一个线程或子进程中异步运行阻塞函数。
async def worker(queue):
    loop = Asyncio.get_event_loop()  # get a handle to the current run loop
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        
        # run blocking function in an executor
        create_path_task = loop.run_in_executor(None, create_path, file_name)
        destination_path = await create_path_task  # wait for this task to finish

        await download_medical(queue_out, destination_path)
Run Code Online (Sandbox Code Playgroud)

背景: 请注意,异步函数(协程)不并行运行任务,它们同时运行,可能看起来同时运行。考虑这个问题的最简单方法是意识到每次await被调用时,即在等待结果时,事件循环将暂停当前正在运行的协程并运行另一个协程,直到等待某些东西等等;从而使其协同并发。

通常对 IO 操作进行等待,因为它们非常耗时且不占用 CPU 资源。CPU 密集型操作将阻塞循环直至完成。另请注意,常规 IO 操作本质上是阻塞的,如果您想从并发中受益,那么您必须使用 Asyncio 兼容库,例如 aiofile、aiohttp 等。

有关执行器的更多信息: 运行常规同步函数而不阻塞事件循环的最简单方法是使用loop.run_in_executor. 第一个参数采用类似ThreadPoolExecutorProcessPoolExecutor来自concurrent.futures模块的执行器。通过传递None,Asyncio 将自动默认运行您的函数ThreadPoolExecutor。如果您的任务是 CPU 密集型任务,请ProcessPoolExecutor使用多个 cpu 核心并真正并行运行。