Mat*_*lem 3 python async-await python-asyncio
我希望在后台运行繁重的计算任务而不阻塞 IO。这里的问题是我的主函数不依赖于繁重的任务,并且需要在执行繁重的计算任务之前/同时返回值。举个例子:
def main(args):
transformed_data_list:List[Dict] = translate_request_to_object(args)
status = insert_data_into_db(transformed_data:Dict)
if(status)
# running background task
asyncio.run(process_background_task(transformed_data_list))
# Here, I want to return a success response as soon as data inserted into db
return "data insert into db"
Run Code Online (Sandbox Code Playgroud)
async process_background_task(transformed_data_list:List[Dict]):
for data in transformed_data_list:List:
asyncio.create_task(heavy_computation_task(data))
Run Code Online (Sandbox Code Playgroud)
但上面的代码在完成之前不会返回响应process_background_task。
asyncio.run是启动事件循环的阻塞函数。如果你想process_background_task在后台启动,你需要使用asyncio.create_task异步main。然后跑asyncio.run(main(...))。
async def main(args):
transformed_data_list:List[Dict] = translate_request_to_object(args)
status = insert_data_into_db(transformed_data:Dict)
if status:
# running background task
asyncio.create_task(process_background_task(transformed_data_list))
# Here, I want to return a success response as soon as data inserted into db
return "data insert into db"
async process_background_task(transformed_data_list:List[Dict]):
for data in transformed_data_list:List:
asyncio.create_task(heavy_computation_task(data))
# Start event loop, execute task and wait until task finish.
asyncio.run(main(...))
Run Code Online (Sandbox Code Playgroud)
但在这种情况下heavy_computation_task会阻止事件循环,您需要使用ProcessPoolExecutorwith loop.run_in_executor。
这是文档中的示例:
import asyncio
import concurrent.futures
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
# Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
在你的情况下:
async def main(args):
transformed_data_list:List[Dict] = translate_request_to_object(args)
status = insert_data_into_db(transformed_data:Dict)
if status:
# running background task
asyncio.create_task(process_background_task(transformed_data_list))
# Here, I want to return a success response as soon as data inserted into db
return "data insert into db"
async process_background_task(transformed_data_list:List[Dict]):
loop = asyncio.get_running_loop()
for data in transformed_data_list:List:
with concurrent.futures.ProcessPoolExecutor() as pool:
await loop.run_in_executor(pool, heavy_computation_task, data)
# Start event loop, execute task and wait until task finish.
asyncio.run(main(...))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7419 次 |
| 最近记录: |