Kon*_*vin 5 python-3.x python-asyncio
我的队列中有一个任务池,希望从拉取输出结果中完成任务并放入新任务。这是正确的方法吗?我定期检查是否有任务完成。像这样创建任务task = asyncio.create_task(...)是异步的吗?(如果我们创建大量更复杂的任务,它会阻塞循环吗)?
import asyncio
from collections import deque
from random import randint
async def show_result(q):
while True:
done_task = await q.get()
result = done_task.result()
print(result)
q.task_done()
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def tasks_worker(q, pool_size):
"""
extract done tasks and put new
"""
delay = get_task_param()
tasks = deque([])
while True:
await asyncio.sleep(1)
# append new tasks
tasks_to_add = pool_size - len(tasks)
print(f"tasks_to_add: {tasks_to_add}")
if tasks_to_add > 0:
# append new tasks
for _ in range(tasks_to_add):
n, d = await delay.__anext__()
print(f"add task: {n} with delay: {d}")
task = asyncio.create_task(some_work(n, d))
tasks.append(task)
for _ in range(len(tasks)):
task = tasks.popleft()
if task.done():
await q.put(task)
else:
tasks.append(task)
async def get_task_param():
task_n = 0
while True:
task_n += 1
await asyncio.sleep(0)
yield task_n, randint(5, 10)
async def run(pool_size):
q = asyncio.Queue()
task_1 = asyncio.create_task(show_result(q))
task_2 = asyncio.create_task(tasks_worker(q, pool_size))
done, pending = await asyncio.wait({task_1, task_2}, return_when=asyncio.ALL_COMPLETED)
print(done)
print(pending)
if __name__ == '__main__':
POOL_SIZE = 50
try:
asyncio.run(run(POOL_SIZE))
except Exception as ex:
print(ex)
Run Code Online (Sandbox Code Playgroud)
use*_*342 11
我需要定期向 1000 台服务器发送 post 请求,但其中一些服务器有时无法快速响应(time_out=~10s)。我想获得已经完成的响应并添加新的请求任务。
您可能应该使用一个队列来分配工作,使用另一个队列来发出结果。您不需要动态添加工作人员,您可以动态添加任务,并让固定大小的工作人员池在任务到来时并行处理它们。例如:
import asyncio
from random import randint
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def worker(tasks, results):
# individual worker task (sometimes called consumer)
# - sequentially process tasks as they come into the queue
# and emit the results
while True:
n, d = await tasks.get()
result = await some_work(n, d)
await results.put(result)
async def assigner(tasks):
# come up with tasks dynamically and enqueue them for processing
task_n = 0
while True:
await asyncio.sleep(1)
task_n += 1
await tasks.put((task_n, randint(5, 10)))
async def displayer(q):
# show results of the tasks as they arrive
while True:
result = await q.get()
print(result)
async def main(pool_size):
tasks = asyncio.Queue(100)
results = asyncio.Queue(100)
workers = [asyncio.create_task(worker(tasks, results))
for _ in range(pool_size)]
await asyncio.gather(assigner(tasks), displayer(results), *workers)
if __name__ == '__main__':
POOL_SIZE = 50
asyncio.run(main(POOL_SIZE))
Run Code Online (Sandbox Code Playgroud)
队列边界,任意选择 100 个项目,限制最大队列大小,并在分配器始终比工作器快或工作器比显示器快的情况下提供背压。如果没有界限,队列在这种情况下只会累积这些项目,这实际上是内存泄漏。通过绑定,Queue.put当队列已满时,它会等到有空闲槽才继续进行。