简化字符串修改功能的嵌套异步操作

alv*_*vas 2 python string parallel-processing coroutine python-asyncio

我有一个异步代码,如下所示:

  • 有一个第三方函数可以对字符串执行一些操作并返回修改后的字符串,就这个问题而言,它类似于non_async_func.

  • 我有一个async def async_func_single包含non_async_func执行单个操作的函数。

  • 然后是另一个async def async_func_batch嵌套环绕的函数async_func_single来对一批数据执行该函数。

代码可以工作,但我想更多地了解为什么/如何工作,我的问题是

  • 是否有必要创建async_func_singleasync_func_batch环绕它?

  • 我可以直接输入一批数据来async_func_batch调用吗non_async_func

  • 我有一个 per_chunk 函数,可以批量输入数据,是否有任何异步操作/函数可以避免使用预批处理我想要发送的数据async_func_batch

import nest_asyncio
nest_asyncio.apply()

import asyncio
from itertools import zip_longest

from loremipsum import get_sentences

def per_chunk(iterable, n=1, fillvalue=None):
  args = [iter(iterable)] * n
  return zip_longest(*args, fillvalue=fillvalue)

def non_async_func(text):
  return text[::-1]

async def async_func_single(text):
  # Perform some string operation.
  return non_async_func(text)

async def async_func_batch(batch):
  tasks = [async_func_single(text) for text in batch]
  return await asyncio.gather(*tasks)

# Create some random inputs
thousand_texts = get_sentences(1000)

# Loop through 20 sentence at a time.
for batch in per_chunk(thousand_texts, n=20):  
  loop = asyncio.get_event_loop()
  results = loop.run_until_complete(async_func_batch(batch))
  for i, o in zip(thousand_texts, results):
    print(i, o)
Run Code Online (Sandbox Code Playgroud)

RPa*_*mer 6

请注意,将函数标记为“async def”而不是“def”不会使它们自动异步 - 您可以拥有同步的“async def”函数。异步函数和同步函数之间的区别在于,异步函数定义等待另一个异步函数或等待异步 IO 操作的位置(使用“await”)。

另请注意,asyncio 并不神奇 - 它基本上是一个调度程序,根据“等待”的函数/操作是否已完成来调度要运行的异步函数。而且,由于调度程序和异步函数都在单个线程上运行,因此在任何给定时刻,只能运行一个异步函数。

因此,回到您的代码,您的“async_func_single”函数所做的唯一事情就是调用同步函数,因此,尽管被标记为“async def”,它仍然是一个同步函数。相同的逻辑适用于“async_func_batch”函数 - 传递给“asyncio.gather”的“async_func_single”任务都是同步的,因此“asyncio.gather”只是同步运行每个任务(因此它没有提供任何好处通过等待每个任务的简单 for 循环),因此“async_func_batch”又是一个同步函数。因为您只是调用同步函数,所以 asyncio 不会为您的程序提供任何好处。

如果您希望多个同步函数同时运行,则不要使用异步函数。您需要在并行进程/线程中运行它们:

import sys
import itertools
import concurrent.futures

from loremipsum import get_sentences

executor = concurrent.futures.ProcessPoolExecutor(workers=sys.cpu_count())

def per_chunk(iterable, n=1):
    while True:
        chunk = tuple(itertools.islice(iterable, n))
        if chunk:
            yield chunk
        else:
            break

def non_async_func(text):
    return text[::-1]

def process_batches(batches):
    futures = [
        executor.submit(non_async_func, batch)
        for batch in batches
    ]
    concurrent.futures.wait(futures)    

thousand_texts = get_sentences(1000)
process_batches(per_chunk(thousand_texts, n=20))
Run Code Online (Sandbox Code Playgroud)

如果您仍然想使用异步函数来处理批次,那么 asyncio 提供了并发 future 的异步包装器:

async def process_batches(batches):
    event_loop = asyncio.get_running_loop()
    futures = [
        event_loop.run_in_executor(executor, non_async_func, batch)
        for batch in batches
    ]
    await asyncio.wait(futures)

thousand_texts = get_sentences(1000)
asyncio.run(process_batches(per_chunk(thousand_texts, n=20)))
Run Code Online (Sandbox Code Playgroud)

但它没有任何优势,除非您有其他可以在等待时运行的异步函数。