wvx*_*xvw 5 python python-asyncio
我怎样才能从期货集合中等待任何未来,有选择地通知其他期货“不再需要它们”?
这就是我需要这个的原因。我想创建一种特殊的期货/任务来计时,并且可以与其他期货/任务一起使用以在它们累积超过某个超时(或在与此计时任务交互后被迫停止)时取消它们。如果您熟悉 Go,它有一个类似的概念,称为Context.
为了使这更具体,想象一下。您有一个典型的 HTTP 客户端。为了从 URL 请求页面,它需要连续执行几个可能永远阻塞的操作。例如,这些操作可以是:
假设您允许整个操作需要一分钟。但是您也知道分配套接字的时间不应超过一毫秒,连接也可能需要一分钟,检索块也是如此。断开连接和资源释放应该需要几毫秒。
假设现在您必须在每个项目符号点上等待完全超时——嗯,您已经超过了两次配额。因此,您需要将每次调用的计算增量传递给其后继调用。此外,假设您无法释放套接字——好吧,没什么大不了的,应用程序可能会从此错误中恢复,因此您还需要区分各种超时。我想这可以这样写(在某些虚构的 Python 版本中):
async def http_request(context, url):
socket = await min(allocate_socket(), context.timeout, socket_timeout)
await min(socket.connect(), context.timeout, connect_timeout)
async for chunk in min(socket.receive(), context.timeout, chunk_timeout):
print(chunk)
await min(socket.close(), context.timeout, close_timeout)
Run Code Online (Sandbox Code Playgroud)
async_timeout正是您所需要的,您的代码将如下所示:
from async_timeout import timeout
async def http_request(url):
async with timeout(timeout_for_all):
async with timeout(socket_timeout):
socket = await allocate_socket()
async with timeout(connect_timeout):
await socket.connect()
async with timeout(chunk_timeout):
async for chunk in socket.receive():
print(chunk)
async with timeout(close_timeout):
await socket.close()
Run Code Online (Sandbox Code Playgroud)
让我们检查一下您提到的问题。
Go 风格的 Context 还可以有 cancel() 方法,该方法允许从外部取消进程,而不管等待的时间。
asyncio有一种方法可以取消任何正在运行的任务,无论超时或其他情况如何。您应该对某些任务调用cancel()方法(asyncio.CancelledError 将在其中引发)并等待任务传播它(可能会抑制异常):
task.cancel()
with suppress(asyncio.CancelledError):
await task
Run Code Online (Sandbox Code Playgroud)
这是在事情完成之前取消事情的标准方法。您不需要比这更复杂的东西。
它还可能根据挂钟或内部计时器到期。
我不确定我是否理解这一点,但它async_timeout确实为您提供了您想要的东西 - 一种用一些具体时间限制任务执行的方法。
另外,我担心如果这不是直接在 asyncio 中作为单独的线程实现,那么无论超时如何,它都必须等待调度/阻塞协程完成(它只能在以下情况下取消执行)执行的协程进入睡眠状态)。
asyncio模块本身在某种意义上是为了避免使用多线程而创建的。理想情况下,您的异步程序应该使用由单线程内的单个事件循环管理的许多协程。
这个公共事件循环管理事情在应该发生的时间发生。此代码将TimeoutError在运行 1 秒后引发:
async with timeout(1):
await asyncio.sleep(20)
Run Code Online (Sandbox Code Playgroud)
更新:
另一个不同的例子是,当我需要等待多个工作人员完成某项任务时,我只关心其中一个工作人员完成该任务,但我根本不关心超时。
也可以使用标准 asyncio 功能来完成:
# Start 3 concurrent tasks (workers):
task_1 = asyncio.ensure_future(coro())
task_2 = asyncio.ensure_future(coro())
task_3 = asyncio.ensure_future(coro())
# Wait first of them done:
tasks = (task_1, task_2, task_3,)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print('done', done.pop().result())
# Cancel others since they're still running,
# but we don't need them to be finished:
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2670 次 |
| 最近记录: |