Ali*_*cka 2 concurrency python-3.x python-asyncio
我有一个生成任务(io 绑定任务)的函数:
def get_task():
while True:
new_task = _get_task()
if new_task is not None:
yield new_task
else:
sleep(1)
Run Code Online (Sandbox Code Playgroud)
我正在尝试在 asyncio 中编写一个消费者,该消费者将同时处理最多 10 个任务,完成一项任务,然后将执行新任务。我不确定是否应该使用信号量或者是否有任何类型的 asycio 池执行程序?我开始用线程写一个伪代码:
def run(self)
while True:
self.semaphore.acquire() # first acquire, then get task
t = get_task()
self.process_task(t)
def process_task(self, task):
try:
self.execute_task(task)
self.mark_as_done(task)
except:
self.mark_as_failed(task)
self.semaphore.release()
Run Code Online (Sandbox Code Playgroud)
有人可以帮助我吗?我不知道把 async/await 关键字放在哪里
使用asyncio.Sepmaphore 的简单任务上限
async def max10(task_generator):
semaphore = asyncio.Semaphore(10)
async def bounded(task):
async with semaphore:
return await task
async for task in task_generator:
asyncio.ensure_future(bounded(task))
Run Code Online (Sandbox Code Playgroud)
这个解决方案的问题是任务是从生成器中贪婪地提取出来的。例如,如果生成器从大型数据库读取,程序可能会耗尽内存。
除此之外,它是惯用的和乖巧的。
一种使用异步生成器协议按需拉取新任务的解决方案:
async def max10(task_generator):
tasks = set()
gen = task_generator.__aiter__()
try:
while True:
while len(tasks) < 10:
tasks.add(await gen.__anext__())
_done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except StopAsyncIteration:
await asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)
它可能被认为是次优的,因为它在 10 个可用任务之前不会开始执行任务。
这是使用工人模式的简洁而神奇的解决方案:
async def max10(task_generator):
async def worker():
async for task in task_generator:
await task
await asyncio.gather(*[worker() for i in range(10)])
Run Code Online (Sandbox Code Playgroud)
它依赖于一个有点违反直觉的特性,即能够在同一个异步生成器上拥有多个异步迭代器,在这种情况下,每个生成的项目只能被一个迭代器看到。
我的直觉告诉我,这些解决方案都没有在取消时正确运行。
| 归档时间: |
|
| 查看次数: |
952 次 |
| 最近记录: |