调用 thread.join() 是否会阻塞异步上下文中的事件循环?

Nic*_*ous 9 python multithreading python-asyncio

我正在使用aiohttp实现一个 Web API ,并使用启用了 UVloop 的Gunicorn--worker-class aiohttp.GunicornUVLoopWebWorker进行部署。因此,我的代码始终在异步上下文中运行。我的想法是在处理请求时实现并行作业以获得更好的性能。

我不使用是asyncio因为我想要Parallelism,而不是Concurrency

我知道python 中的多处理GIL 问题。但加入流程也适用于我的问题。

这是一个例子:

from aiohttp.web import middleware

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['threads'] = []
    ret = await handler(request)
    for thread in request.context['threads']:
        thread.join()
    return ret
Run Code Online (Sandbox Code Playgroud)

考虑到thread.join()orprocess.join()阻止当前线程,这将阻止事件循环(据我所知)。如何异步加入?我想要的可以形象地表示为:await thread.join()await process.join()

更新:

感谢@user4815162342,我能够为我的项目编写正确的代码:

中间件:

from aiohttp.web import middleware
from util.process_session import ProcessSession

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['process_session'] = ProcessSession()
    request.context['processes'] = {}
    ret = await handler(request)
    await request.context['process_session'].wait_for_all()
    return ret
Run Code Online (Sandbox Code Playgroud)

实用程序:

import asyncio
import concurrent.futures
from functools import partial

class ProcessSession():

    def __init__(self):
        self.loop = asyncio.get_running_loop()
        self.pool = concurrent.futures.ProcessPoolExecutor()
        self.futures = []

    async def wait_for_all(self):
        await asyncio.wait(self.futures)

    def add_process(self, f, *args, **kwargs):
        ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))
        self.futures.append(ret)
        return ret

class ProcessBase():

    def __init__(self, process_session, f, *args, **kwargs):
        self.future = process_session.add_process(f, *args, **kwargs)

    async def wait(self):
        await asyncio.wait([self.future])
        return self.future.result()
Run Code Online (Sandbox Code Playgroud)

Leo*_*ick 6

回答你的问题:是的,它确实阻止了事件循环。

我发现这ThreadPoolExecutor在这种情况下效果很好。

from util.process_session import ProcessSession
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio

from aiohttp.web import middleware

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['threads'] = []
    ret = await handler(request)
    with ThreadPoolExecutor(1) as executor:
           await asyncio.get_event_loop().run_in_executor(executor, 
           functools.partial(join_threads, data={
             'threads': request.context['threads']
           }))
    return ret

def join_threads(threads):
    for t in threads:
        t.join()
Run Code Online (Sandbox Code Playgroud)