Nic*_*ous 9 python multithreading python-asyncio
我正在使用aiohttp实现一个 Web API ,并使用启用了 UVloop 的Gunicorn--worker-class aiohttp.GunicornUVLoopWebWorker进行部署。因此,我的代码始终在异步上下文中运行。我的想法是在处理请求时实现并行作业以获得更好的性能。
我不使用是asyncio因为我想要Parallelism,而不是Concurrency。
我知道python 中的多处理和GIL 问题。但加入流程也适用于我的问题。
这是一个例子:
from aiohttp.web import middleware
@middleware
async def context_init(request, handler):
request.context = {}
request.context['threads'] = []
ret = await handler(request)
for thread in request.context['threads']:
thread.join()
return ret
Run Code Online (Sandbox Code Playgroud)
考虑到thread.join()orprocess.join()阻止当前线程,这将阻止事件循环(据我所知)。如何异步加入?我想要的可以形象地表示为:await thread.join()或await process.join()。
更新:
感谢@user4815162342,我能够为我的项目编写正确的代码:
中间件:
from aiohttp.web import middleware
from util.process_session import ProcessSession
@middleware
async def context_init(request, handler):
request.context = {}
request.context['process_session'] = ProcessSession()
request.context['processes'] = {}
ret = await handler(request)
await request.context['process_session'].wait_for_all()
return ret
Run Code Online (Sandbox Code Playgroud)
实用程序:
import asyncio
import concurrent.futures
from functools import partial
class ProcessSession():
def __init__(self):
self.loop = asyncio.get_running_loop()
self.pool = concurrent.futures.ProcessPoolExecutor()
self.futures = []
async def wait_for_all(self):
await asyncio.wait(self.futures)
def add_process(self, f, *args, **kwargs):
ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))
self.futures.append(ret)
return ret
class ProcessBase():
def __init__(self, process_session, f, *args, **kwargs):
self.future = process_session.add_process(f, *args, **kwargs)
async def wait(self):
await asyncio.wait([self.future])
return self.future.result()
Run Code Online (Sandbox Code Playgroud)
回答你的问题:是的,它确实阻止了事件循环。
我发现这ThreadPoolExecutor在这种情况下效果很好。
from util.process_session import ProcessSession
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
from aiohttp.web import middleware
@middleware
async def context_init(request, handler):
request.context = {}
request.context['threads'] = []
ret = await handler(request)
with ThreadPoolExecutor(1) as executor:
await asyncio.get_event_loop().run_in_executor(executor,
functools.partial(join_threads, data={
'threads': request.context['threads']
}))
return ret
def join_threads(threads):
for t in threads:
t.join()
Run Code Online (Sandbox Code Playgroud)