异步任务池

Kon*_*vin 5 python-3.x python-asyncio

我的队列中有一个任务池,希望从拉取输出结果中完成任务并放入新任务。这是正确的方法吗?我定期检查是否有任务完成。像这样创建任务task = asyncio.create_task(...)是异步的吗?(如果我们创建大量更复杂的任务,它会阻塞循环吗)?

import asyncio
from collections import deque
from random import randint


async def show_result(q):
    while True:
        done_task = await q.get()
        result = done_task.result()
        print(result)
        q.task_done()



async def some_work(n, delay):
    await asyncio.sleep(delay)
    return f'task {n} with delay: {delay} completed'


async def tasks_worker(q, pool_size):
    """
    extract done tasks and put new
    """

    delay = get_task_param()
    tasks = deque([])

    while True:
        await asyncio.sleep(1)
        # append new tasks
        tasks_to_add = pool_size - len(tasks)
        print(f"tasks_to_add: {tasks_to_add}")
        if tasks_to_add > 0:
            # append new tasks
            for _ in range(tasks_to_add):
                n, d = await delay.__anext__()
                print(f"add task: {n} with delay: {d}")
                task = asyncio.create_task(some_work(n, d))
                tasks.append(task)

        for _ in range(len(tasks)):
            task = tasks.popleft()
            if task.done():
                await q.put(task)
            else:
                tasks.append(task)


async def get_task_param():
    task_n = 0
    while True:
        task_n += 1
        await asyncio.sleep(0)
        yield task_n, randint(5, 10)


async def run(pool_size):
    q = asyncio.Queue()

    task_1 = asyncio.create_task(show_result(q))
    task_2 = asyncio.create_task(tasks_worker(q, pool_size))

    done, pending = await asyncio.wait({task_1, task_2}, return_when=asyncio.ALL_COMPLETED)
    print(done)
    print(pending)


if __name__ == '__main__':

    POOL_SIZE = 50

    try:
        asyncio.run(run(POOL_SIZE))
    except Exception as ex:
        print(ex)
Run Code Online (Sandbox Code Playgroud)

use*_*342 11

我需要定期向 1000 台服务器发送 post 请求,但其中一些服务器有时无法快速响应(time_out=~10s)。我想获得已经完成的响应并添加新的请求任务。

您可能应该使用一个队列来分配工作,使用另一个队列来发出结果。您不需要动态添加工作人员,您可以动态添加任务,并让固定大小的工作人员池在任务到来时并行处理它们。例如:

import asyncio
from random import randint

async def some_work(n, delay):
    await asyncio.sleep(delay)
    return f'task {n} with delay: {delay} completed'

async def worker(tasks, results):
    # individual worker task (sometimes called consumer)
    # - sequentially process tasks as they come into the queue
    # and emit the results
    while True:
        n, d = await tasks.get()
        result = await some_work(n, d)
        await results.put(result)

async def assigner(tasks):
    # come up with tasks dynamically and enqueue them for processing
    task_n = 0
    while True:
        await asyncio.sleep(1)
        task_n += 1
        await tasks.put((task_n, randint(5, 10)))

async def displayer(q):
    # show results of the tasks as they arrive
    while True:
        result = await q.get()
        print(result)

async def main(pool_size):
    tasks = asyncio.Queue(100)
    results = asyncio.Queue(100)
    workers = [asyncio.create_task(worker(tasks, results))
               for _ in range(pool_size)]
    await asyncio.gather(assigner(tasks), displayer(results), *workers)

if __name__ == '__main__':
    POOL_SIZE = 50
    asyncio.run(main(POOL_SIZE))
Run Code Online (Sandbox Code Playgroud)

队列边界,任意选择 100 个项目,限制最大队列大小,并在分配器始终比工作器快或工作器比显示器快的情况下提供背压。如果没有界限,队列在这种情况下只会累积这些项目,这实际上是内存泄漏。通过绑定,Queue.put当队列已满时,它会等到有空闲槽才继续进行。