在 asyncio 中批量处理任务

Ali*_*cka 2 concurrency python-3.x python-asyncio

我有一个生成任务(io 绑定任务)的函数:

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)
Run Code Online (Sandbox Code Playgroud)

我正在尝试在 asyncio 中编写一个消费者,该消费者将同时处理最多 10 个任务,完成一项任务,然后将执行新任务。我不确定是否应该使用信号量或者是否有任何类型的 asycio 池执行程序?我开始用线程写一个伪代码:

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()
Run Code Online (Sandbox Code Playgroud)

有人可以帮助我吗?我不知道把 async/await 关键字放在哪里

Dim*_*nek 5

使用asyncio.Sepmaphore 的简单任务上限

async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))
Run Code Online (Sandbox Code Playgroud)

这个解决方案的问题是任务是从生成器中贪婪地提取出来的。例如,如果生成器从大型数据库读取,程序可能会耗尽内存。

除此之外,它是惯用的和乖巧的。

一种使用异步生成器协议按需拉取新任务的解决方案:

async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)

它可能被认为是次优的,因为它在 10 个可用任务之前不会开始执行任务。

这是使用工人模式的简洁而神奇的解决方案:

async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])
Run Code Online (Sandbox Code Playgroud)

它依赖于一个有点违反直觉的特性,即能够在同一个异步生成器上拥有多个异步迭代器,在这种情况下,每个生成的项目只能被一个迭代器看到。

我的直觉告诉我,这些解决方案都没有在取消时正确运行。