如何在异步代码中处理 CPU 密集型任务

raj*_*ajn 3 python-asyncio

我正在做一些需要异步方法的繁重处理。我的一个方法返回一个字典列表,在将其添加到另一个可等待对象之前需要对其进行大量处理。IE。

def cpu_bound_task_here(record):
    ```some complicated preprocessing of record```
    return record
Run Code Online (Sandbox Code Playgroud)

在好心人给出以下答案后,我的代码现在被卡住了。

async def fun():
print("Socket open")
record_count = 0
symbol = obj.symbol.replace("-", "").replace("/", "")
loop = asyncio.get_running_loop()
await obj.send()

while True:
    try:
        records = await obj.receive()
        if not records:
            continue

        record_count += len(records)
        
Run Code Online (Sandbox Code Playgroud)

因此,上述函数的作用是异步流式传输值,并在无限期地推送到 Redis 之前进行一些繁重的处理。我做了必要的改变,但现在我陷入了困境。

dir*_*irn 5

正如该输出告诉您的那样,run_in_executor返回一个Future. 您需要等待它才能得到结果。

record = await loop.run_in_executor(
    None, something_cpu_bound_task_here, record
)
Run Code Online (Sandbox Code Playgroud)

请注意,任何参数都something_cpu_bound_task_here需要传递给run_in_executor.

此外,正如您所提到的,这是一个 CPU 密集型任务,您需要确保您使用的是concurrent.futures.ProcessPoolExecutor. 除非您在某个地方调用过loop.set_default_executor,否则默认值是 的实例ThreadPoolExecutor

with ProcessPoolExecutor() as executor:
    for record in records:
        record = await loop.run_in_executor(
            executor, something_cpu_bound_task_here, record
        )
Run Code Online (Sandbox Code Playgroud)

最后,您的 while 循环有效地同步运行。您需要等待 future,然后等待 for,然后obj.add才能继续处理 中的下一个项目records。您可能需要稍微重组您的代码并使用类似的东西gather来允许一些并发性。

async def process_record(record, obj, loop, executor):
    record = await loop.run_in_executor(
        executor, something_cpu_bound_task_here, record
    )
    await obj.add(record)

async def fun():
    loop = asyncio.get_running_loop()
    records = await receive()
    with ProcessPoolExecutor() as executor:
        await asyncio.gather(
            *[process_record(record, obj, loop, executor) for record in records]
        )
        
Run Code Online (Sandbox Code Playgroud)

我不确定如何处理,obj因为您的示例中没有定义这一点,但我相信您可以弄清楚。