任何 asyncio 任务在内存和速度方面的开销是多少?在不需要并发运行的情况下最小化任务数量是否值得?
使用asyncio可以在超时时执行协程,以便在超时后取消:
@asyncio.coroutine
def coro():
yield from asyncio.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 5))
Run Code Online (Sandbox Code Playgroud)
上面的示例按预期工作(它在5秒后超时).
但是,当协程不使用asyncio.sleep()(或其他asyncio协同程序)时,它似乎没有超时.例:
@asyncio.coroutine
def coro():
import time
time.sleep(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 1))
Run Code Online (Sandbox Code Playgroud)
运行时间超过10秒,因为time.sleep(10)未取消.在这种情况下是否可以强制取消协程?
如果应该使用asyncio来解决这个问题,我该怎么做?
async def caller():
await bar()
print("finish")
async def bar():
// some code here
Run Code Online (Sandbox Code Playgroud)
async def caller():
bar()
print("finish")
def bar():
//some code here
Run Code Online (Sandbox Code Playgroud)
在上面的例子中。两种情况下,调用者都必须等待 bar() 完成。在这种情况下,bar 成为普通/协程有什么不同吗?如果我们想“等待”某些函数,为什么不使用普通函数。
出于可读性原因,我可能会破坏代码。所以
async coro_top():
print('top')
print('1')
# ... More asyncio code
print('2')
# ... More asyncio code
Run Code Online (Sandbox Code Playgroud)
...变成类似
async coro_top():
print('top')
await coro_1()
await coro_2()
async coro_1()
print('1')
# ... More asyncio code
async coro_2()
print('2')
# ... More asyncio code
Run Code Online (Sandbox Code Playgroud)
但是,额外的awaits表示它们并不严格等效
另一个并发任务可以在print('top')和之间运行代码print('1'),因此对于某些算法,竞争条件更有可能出现。
产生事件循环的开销(大概)
因此,有没有一种方法可以在不产生事件循环的情况下调用协程以避免上述情况发生?
据我了解,目的是同时asyncio.gather运行其参数,并且当协程执行等待表达式时,它为事件循环提供了安排其他任务的机会。考虑到这一点,我惊讶地发现以下代码片段忽略了.asyncio.gather
import asyncio
async def aprint(s):
print(s)
async def forever(s):
while True:
await aprint(s)
async def main():
await asyncio.gather(forever('a'), forever('b'))
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
据我了解,会发生以下情况:
实际上,这不是我所观察到的。相反,整个程序相当于while True: print('a'). 我发现非常有趣的是,即使对代码进行微小的更改似乎也会重新引入公平性。例如,如果我们使用以下代码,那么我们会在输出中得到大致相等的“a”和“b”混合。
async def forever(s):
while True:
await aprint(s)
await asyncio.sleep(1.)
Run Code Online (Sandbox Code Playgroud)
验证它似乎与我们在无限循环中和在无限循环外花费的时间没有任何关系,我发现以下更改也提供了公平性。
async def forever(s):
while True:
await aprint(s)
await asyncio.sleep(0.)
Run Code Online (Sandbox Code Playgroud)
有谁知道为什么会发生这种不公平现象以及如何避免它?我想,当有疑问时,我可以主动在各处添加一个空的睡眠语句,并希望这足够了,但对我来说,为什么原始代码的行为不符合预期,这对我来说非常不明显。
以防万一,因为 asyncio 似乎已经经历了相当多的 API 更改,我在 Ubuntu 机器上使用 Python 3.8.4 的普通安装。
我遇到了一个奇怪的问题asyncio.Queue- 不是在项目可用时立即返回,而是等待队列已满,然后再返回任何内容。我意识到在使用队列来存储从 收集的帧时cv2.VideoCapture,maxsize队列越大,在屏幕上显示任何内容所需的时间就越长,然后,它看起来像是收集到队列中的所有帧的序列。
这是一个功能,一个错误,还是我只是用错了?
无论如何,这是我的代码
import asyncio
import cv2
import numpy as np
async def collecting_loop(queue):
print("cl")
cap = cv2.VideoCapture(0)
while True:
_, img = cap.read()
await queue.put(img)
async def processing_loop(queue):
print("pl")
await asyncio.sleep(0.1)
while True:
img = await queue.get()
cv2.imshow('img', img)
cv2.waitKey(5)
async def main(e_loop):
print("running main")
queue = asyncio.Queue(loop=e_loop, maxsize=10)
await asyncio.gather(collecting_loop(queue), processing_loop(queue))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(e_loop=loop))
except KeyboardInterrupt:
pass
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud) 在这个简单的生产者/消费者示例中,就好像await queue.put(item)不释放事件循环以允许消费者运行直到完成。这会导致生产者将其所有项目放入队列中,然后消费者才能将其取出。
这是预期的吗?
await queue.put(item)如果我遵循with ,我就会得到我正在寻找的结果await asyncio.sleep(0)。
然后,生产者将 1 个项目放入队列中,然后消费者从队列中取出 1 个项目。
我在 Python 3.6.8 和 3.7.2 中得到相同的结果。
import asyncio
async def produce(queue, n):
for x in range(1, n + 1):
print('producing {}/{}'.format(x, n))
item = str(x)
await queue.put(item)
# await asyncio.sleep(0)
await queue.put(None)
async def consume(queue):
while True:
item = await queue.get()
if item is None:
break
print('consuming item {}...'.format(item))
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, …Run Code Online (Sandbox Code Playgroud) 我觉得我对异步 IO 的理解存在差距:在较大的协程范围内将小函数包装到协程中是否有好处? 正确地发出事件循环信号有好处吗?这种好处的程度是否取决于包装的函数是 IO 还是 CPU 密集型?
示例:我有一个协程,download()其中:
aiohttp。bz2.compress()- 这本身不是可等待的aioboto3因此,第 1 部分和第 3 部分使用这些库中的预定义协程;默认情况下,第 2 部分没有。
简化的例子:
import bz2
import io
import aiohttp
import aioboto3
async def download(endpoint, bucket_name, key):
async with aiohttp.ClientSession() as session:
async with session.request("GET", endpoint, raise_for_status=True) as resp:
raw = await resp.read() # payload (bytes)
# Yikes - isn't it bad to throw a …Run Code Online (Sandbox Code Playgroud)