单独线程中的 Asyncio“即发即忘”任务

pir*_*pir 6 python multithreading fire-and-forget python-asyncio

我有一个长时间运行的同步 Python 程序,我希望每秒运行大约 10 个“即发即忘”任务。这些任务调用远程 API,不需要返回任何值。我尝试了这个答案,但它需要太多的CPU/内存来生成和维护所有单独的线程,所以我一直在研究asyncio。

这个答案很好地解释了如何使用 asyncio 运行“即发即忘”。但是,它需要使用 using run_until_complete(),它会等待所有异步任务完成。我的程序使用同步 Python,所以这对我不起作用。理想情况下,代码应该像这样简单,log_remote不会阻塞循环:

while True:
    latest_state, metrics = expensive_function(latest_state)
    log_remote(metrics) # <-- this should be run as "fire and forget"
Run Code Online (Sandbox Code Playgroud)

我使用的是Python 3.7。如何在另一个线程上使用 asyncio 轻松运行它?

use*_*342 7

您可以在单个后台线程中启动单个事件循环,并将其用于所有即发即弃任务。例如:

import asyncio, threading

_loop = None

def fire_and_forget(coro):
    global _loop
    if _loop is None:
        _loop = asyncio.new_event_loop()
        threading.Thread(target=_loop.run_forever, daemon=True).start()
    _loop.call_soon_threadsafe(asyncio.create_task, coro)
Run Code Online (Sandbox Code Playgroud)

完成后,您只需调用fire_and_forget通过调用创建的协程对象async def

# fire_and_forget defined as above

import time

async def long_task(msg):
    print(msg)
    await asyncio.sleep(1)
    print('done', msg)

fire_and_forget(long_task('foo'))
fire_and_forget(long_task('bar'))
print('continuing with something else...')
time.sleep(3)
Run Code Online (Sandbox Code Playgroud)

请注意,log_remote实际上需要async def使用 asyncio、aiohttp 而不是 requests 等来编写。