wvx*_*xvw 64 python python-3.x python-asyncio
这个问题是由我的另一个问题推动的:如何在cdef中等待?
网上有大量的文章和博客文章asyncio
,但它们都非常肤浅.我找不到任何有关如何asyncio
实际实现的信息,以及I/O异步的原因.我试图阅读源代码,但它是成千上万行不是最高级别的C代码,其中很多处理辅助对象,但最重要的是,很难在Python语法和它将翻译的C代码之间建立连接成.
Asycnio自己的文档甚至没那么有用.没有关于它是如何工作的信息,只有关于如何使用它的一些指导,这些指导有时也会误导/写得很差.
我熟悉Go的coroutines实现,并且希望Python做同样的事情.如果是这种情况,我在上面链接的帖子中出现的代码就可以了.既然没有,我现在正试图找出原因.到目前为止我最好的猜测如下,请纠正我错在哪里:
async def foo(): ...
实际上被解释为类继承的方法coroutine
.async def
实际上是通过await
语句拆分成多个方法,其中调用这些方法的对象能够跟踪到目前为止通过执行所做的进度.await
语句) ).换句话说,这是我尝试将某些asyncio
语法"贬低"为更容易理解的东西:
async def coro(name):
print('before', name)
await asyncio.sleep()
print('after', name)
asyncio.gather(coro('first'), coro('second'))
# translated from async def coro(name)
class Coro(coroutine):
def before(self, name):
print('before', name)
def after(self, name):
print('after', name)
def __init__(self, name):
self.name = name
self.parts = self.before, self.after
self.pos = 0
def __call__():
self.parts[self.pos](self.name)
self.pos += 1
def done(self):
return self.pos == len(self.parts)
# translated from asyncio.gather()
class AsyncIOManager:
def gather(*coros):
while not every(c.done() for c in coros):
coro = random.choice(coros)
coro()
Run Code Online (Sandbox Code Playgroud)
如果我的猜测证明是正确的:那么我有一个问题.在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个翻译是否被暂停,I/O发生在翻译之外?I/O究竟是什么意思?如果我的python程序调用C open()
程序,并且它又向内核发送中断,放弃对它的控制,Python解释器如何知道这一点并且能够继续运行其他代码,而内核代码执行实际的I/O,直到它唤醒了最初发送中断的Python程序?原则上Python解释器如何才能意识到这一点?
Bha*_*rel 119
在回答这个问题之前,我们需要了解一些基本术语,如果您已经了解其中任何一个,请跳过这些术语.
生成器是允许我们暂停执行python函数的对象.用户策划的生成器是使用关键字实现的yield
.通过创建包含yield
关键字的普通函数,我们将该函数转换为生成器:
>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
Run Code Online (Sandbox Code Playgroud)
如您所见,调用next()
生成器会导致解释器加载测试的帧,并返回yield
ed值.next()
再次调用,导致帧再次加载到解释器堆栈中,并继续执行yield
另一个值.
第三次next()
调用时,我们的发电机完成了,StopIteration
并被抛出.
生成器的一个鲜为人知的特性是,您可以使用两种方法与它们进行通信:send()
和throw()
.
>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception
Run Code Online (Sandbox Code Playgroud)
在调用时gen.send()
,该值将作为yield
关键字的返回值传递.
gen.throw()
另一方面,允许在生成器中抛出异常,yield
并调用在同一位置引发的异常.
从生成器返回值会导致将值放入StopIteration
异常中.我们以后可以从异常中恢复该值并将其用于我们的需要.
>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
Run Code Online (Sandbox Code Playgroud)
yield from
Python 3.4附带了一个新关键字:yield from
.该关键字允许我们做什么,是传递任何next()
,send()
并throw()
进入最内层嵌套的生成器.如果内部生成器返回一个值,它也是返回值yield from
:
>>> def inner():
... print((yield 2))
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print(val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen)
2
>>> gen.send("abc")
abc
3
4
Run Code Online (Sandbox Code Playgroud)
yield from
在Python 3.4中引入new关键字后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层生成器传递到最外层的生成器.这为生成器产生了新的含义 - 协同程序.
协同程序是可以在运行时停止和恢复的功能.在Python中,它们是使用async def
关键字定义的.就像发电机,他们也用自己的形式yield from
是await
.在Python 3.5 之前async
和之前await
,我们创建了协同程序,其方式与创建生成器完全相同(yield from
代替await
).
async def inner():
return 1
async def outer():
await inner()
Run Code Online (Sandbox Code Playgroud)
与实现该__iter__()
方法的每个迭代器或生成器一样,协程实现__await__()
允许它们在每次await coro
调用时继续.
在asyncio中,除了协程功能外,我们还有两个重要的对象:任务和未来.
期货是__await__()
实施方法的对象,它们的工作是保持某种状态和结果.州可以是以下之一:
fut.cancel()
fut.set_result()
异常集fut.set_exception()
结果,就像你猜到的那样,可以是一个Python对象,将被返回,或者可能引发的异常.
对象的另一个重要特性future
是它们包含一个名为的方法add_done_callback()
.此方法允许在任务完成后立即调用函数 - 无论是引发异常还是已完成.
任务对象是特殊的未来,它包含协同程序,并与最内部和最外部的协同程序进行通信.每当协程成为await
未来时,未来就会一直传递回任务(就像在yield from
),任务就会收到它.
接下来,任务将自己与未来联系起来.它是通过呼唤add_done_callback()
未来来实现的.从现在开始,如果未来将要完成,通过被取消,传递异常或传递Python对象作为结果,将调用任务的回调,并且它将恢复存在.
我们必须回答的最后一个问题是 - IO如何实施?
在asyncio内部,我们有一个事件循环.任务的事件循环.事件循环的工作是在每次准备就绪时调用任务,并将所有工作协调到一台工作机器中.
事件循环的IO部分建立在一个叫做的关键函数上select
.Select是一个阻塞功能,由下面的操作系统实现,允许在套接字上等待传入或传出数据.收到数据后,它会唤醒,并返回接收数据的套接字或准备写入的套接字.
当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的是首先检查套接字是否有任何可以立即读取或发送的数据.如果它的.send()
缓冲区已满,或者.recv()
缓冲区为空,则套接字将注册到该select
函数(通过简单地将其添加到其中一个列表,rlist
for recv
和wlist
for send
)以及相应的函数await
sa新创建的future
对象,绑定到该套接字.
当所有可用任务都在等待期货时,事件循环会调用select
并等待.当其中一个套接字有传入数据,或者它的send
缓冲区耗尽时,asyncio会检查绑定到该套接字的未来对象,并将其设置为完成.
现在所有的魔法都发生了.未来已经完成,之前自我增加的任务add_done_callback()
恢复生机,并调用.send()
协程恢复最内部协程(因为await
链)并且你从附近的缓冲区读取新接收的数据它被泄露了.
方法链再次,在以下情况下recv()
:
select.select
等待的时间.future.set_result()
叫做.add_done_callback()
现在已被唤醒..send()
协程,它一直进入最内部的协程并唤醒它.总之,asyncio使用生成器功能,允许暂停和恢复功能.它使用的yield from
功能允许数据从最内层生成器传递到最外层.它使用所有这些来暂停函数执行,同时等待IO完成(通过使用OS select
函数).
最棒的是什么?当一个功能暂停时,另一个功能可以运行并与精致的织物交错,这是asyncio.
Mis*_*agi 51
Talking about async/await
and asyncio
is not the same thing. The first is a fundamental, low-level construct (coroutines) while the later is a library using these constructs. Conversely, there is no single ultimate answer.
The following is a general description of how async/await
and asyncio
-like libraries work. That is, there may be other tricks on top (there are...) but they are inconsequential unless you build them yourself. The difference should be negligible unless you already know enough to not have to ask such a question.
Just like subroutines (functions, procedures, ...), coroutines (generators, ...) are an abstraction of call stack and instruction pointer: there is a stack of executing code pieces, and each is at a specific instruction.
The distinction of def
versus async def
is merely for clarity. The actual difference is return
versus yield
. From this, await
or yield from
take the difference from individual calls to entire stacks.
A subroutine represents a new stack level to hold local variables, and a single traversal of its instructions to reach an end. Consider a subroutine like this:
def subfoo(bar):
qux = 3
return qux * bar
Run Code Online (Sandbox Code Playgroud)
When you run it, that means
bar
and qux
return
,将其值推送到调用堆栈值得注意的是,4表示子程序始终以相同的状态开始.完成后,功能本身独有的所有内容都将丢失.即使之后有指令,也无法恢复功能return
.
root -\
: \- subfoo --\
:/--<---return --/
|
V
Run Code Online (Sandbox Code Playgroud)
协程就像一个子程序,但可以在不破坏其状态的情况下退出.考虑这样的协程:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
Run Code Online (Sandbox Code Playgroud)
当你运行它时,这意味着
bar
和分配堆栈空间qux
yield
,将其值推送到调用堆栈,但存储堆栈和指令指针yield
,恢复堆栈和指令指针并推送参数qux
return
,将其值推送到调用堆栈注意添加2.1和2.2 - 协程可以在预定义的点暂停和恢复.这类似于在调用另一个子例程期间子例程的暂停方式.不同之处在于活动协程未严格绑定到其调用堆栈.相反,挂起的协程是一个单独的,孤立的堆栈的一部分.
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
Run Code Online (Sandbox Code Playgroud)
这意味着悬浮的协同程序可以在堆栈之间自由存储或移动.任何有权访问协程的调用堆栈都可以决定恢复它.
So far, our coroutine only goes down the call stack with yield
. A subroutine can go down and up the call stack with return
and ()
. For completeness, coroutines also need a mechanism to go up the call stack. Consider a coroutine like this:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
Run Code Online (Sandbox Code Playgroud)
When you run it, that means it still allocates the stack and instruction pointer like a subroutine. When it suspends, that still is like storing a subroutine.
However, yield from
does both. It suspends stack and instruction pointer of wrap
and runs cofoo
. Note that wrap
stays suspended until cofoo
finishes completely. Whenever cofoo
suspends or something is sent, cofoo
is directly connected to the calling stack.
As established, yield from
allows to connect two scopes across another intermediate one. When applied recursively, that means the top of the stack can be connected to the bottom of the stack.
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
Run Code Online (Sandbox Code Playgroud)
Note that root
and coro_b
do not know about each other. This makes coroutines much cleaner than callbacks: coroutines still built on a 1:1 relation like subroutines. Coroutines suspend and resume their entire existing execution stack up until a regular call point.
Notably, root
could have an arbitrary number of coroutines to resume. Yet, it can never resume more than one at the same time. Coroutines of the same root are concurrent but not parallel!
async
and await
The explanation has so far explicitly used the yield
and yield from
vocabulary of generators - the underlying functionality is the same. The new Python3.5 syntax async
and await
exists mainly for clarity.
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
Run Code Online (Sandbox Code Playgroud)
The async for
and async with
statements are needed because you would break the yield from/await
chain with the bare for
and with
statements.
By itself, a coroutine has no concept of yielding control to another coroutine. It can only yield control to the caller at the bottom of a coroutine stack. This caller can then switch to another coroutine and run it.
This root node of several coroutines is commonly an event loop: on suspension, a coroutine yields an event on which it wants resume. In turn, the event loop is capable of efficiently waiting for these events to occur. This allows it to decide which coroutine to run next, or how to wait before resuming.
Such a design implies that there is a set of pre-defined events that the loop understands. Several coroutines await
each other, until finally an event is await
ed. This event can communicate directly with the event loop by yield
ing control.
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
Run Code Online (Sandbox Code Playgroud)
The key is that coroutine suspension allows the event loop and events to directly communicate. The intermediate coroutine stack does not require any knowledge about which loop is running it, nor how events work.
The simplest event to handle is reaching a point in time. This is a fundamental block of threaded code as well: a thread repeatedly sleep
s until a condition is true.
However, a regular sleep
blocks execution by itself - we want other coroutines to not be blocked. Instead, we want tell the event loop when it should resume the current coroutine stack.
An event is simply a value we can identify - be it via an enum, a type or other identity. We can define this with a simple class that stores our target time. In addition to storing the event information, we can allow to await
a class directly.
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
Run Code Online (Sandbox Code Playgroud)
This class only stores the event - it does not say how to actually handle it.
The only special feature is __await__
- it is what the await
keyword looks for. Practically, it is an iterator but not available for the regular iteration machinery.
Now that we have an event, how do coroutines react to it? We should be able to express the equivalent of sleep
by await
ing our event. To better see what is going on, we wait twice for half the time:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
Run Code Online (Sandbox Code Playgroud)
We can directly instantiate and run this coroutine. Similar to a generator, using coroutine.send
runs the coroutine until it yield
s a result.
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
Run Code Online (Sandbox Code Playgroud)
This gives us two AsyncSleep
events and then a StopIteration
when the coroutine is done. Notice that the only delay is from time.sleep
in the loop! Each AsyncSleep
only stores an offset from the current time.
At this point, we have two separate mechanisms at our disposal:
AsyncSleep
Events that can be yielded from inside a coroutinetime.sleep
that can wait without impacting coroutinesNotably, these two are orthogonal: neither one affects or triggers the other. As a result, we can come up with our own strategy to sleep
to meet the delay of an AsyncSleep
.
If we have several coroutines, each can tell us when it wants to be woken up. We can then wait until the first of them wants to be resumed, then for the one after, and so on. Notably, at each point we only care about which one is next.
This makes for a straightforward scheduling:
A trivial implementation does not need any advanced concepts. A list
allows to sort coroutines by date. Waiting is a regular time.sleep
. Running coroutines works just like before with coroutine.send
.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
Run Code Online (Sandbox Code Playgroud)
Of course, this has ample room for improvement. We can use a heap for the wait queue or a dispatch table for events. We could also fetch return values from the StopIteration
and assign them to the coroutine. However, the fundamental principle remains the same.
The AsyncSleep
event and run
event loop are a fully working implementation of timed events.
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
Run Code Online (Sandbox Code Playgroud)
This cooperatively switches between each of the five coroutines, suspending each for 0.1 seconds. Even though the event loop is synchronous, it still executes the work in 0.5 seconds instead of 2.5 seconds. Each coroutine holds state and acts independently.
An event loop that supports sleep
is suitable for polling. However, waiting for I/O on a file handle can be done more efficiently: the operating system implements I/O and thus knows which handles are ready. Ideally, an event loop should support an explicit "ready for I/O" event.
select
callPython already has an interface to query the OS for read I/O handles. When called with handles to read or write, it returns the handles ready to read or write:
readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
Run Code Online (Sandbox Code Playgroud)
For example, we can open
a file for writing and wait for it to be ready:
write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])
Run Code Online (Sandbox Code Playgroud)
Once select returns, writeable
contains our open file.
Similar to the AsyncSleep
request, we need to define an event for I/O. With the underlying select
logic, the event must refer to a readable object - say an open
file. In addition, we store how much data to read.
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
Run Code Online (Sandbox Code Playgroud)
As with AsyncSleep
we mostly just store the data required for the underlying system call. This time, __await__
is capable of being resumed multiple times - until our desired amount
has been read. In addition, we return
the I/O result instead of just resuming.
The basis for our event loop is still the run
defined previously. First, we need to track the read requests. This is no longer a sorted schedule, we only map read requests to coroutines.
# new
waiting_read = {} # type: Dict[file, coroutine]
Run Code Online (Sandbox Code Playgroud)
Since select.select
takes a timeout parameter, we can use it in place of time.sleep
.
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])
Run Code Online (Sandbox Code Playgroud)
This gives us all readable files - if there are any, we run the corresponding coroutine. If there are none, we have waited long enough for our current coroutine to run.
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
Run Code Online (Sandbox Code Playgroud)
Finally, we have to actually listen for read requests.
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
Run Code Online (Sandbox Code Playgroud)
The above was a bit of a simplification. We need to do some switching to not starve sleeping coroutines if we can always read. We need to handle having nothing to read or nothing to wait for. However, the end result still fits into 30 LOC.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
Run Code Online (Sandbox Code Playgroud)
The AsyncSleep
, AsyncRead
and run
implementations are now fully functional to sleep and/or read.
Same as for sleepy
, we can define a helper to test reading:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = return await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
Run Code Online (Sandbox Code Playgroud)
Running this, we can see that our I/O is interleaved with the waiting task:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
Run Code Online (Sandbox Code Playgroud)
While I/O on files gets the concept across, it is not really suitable for a library like asyncio
: the select
call always returns for files, and both open
and read
may block indefinitely. This blocks all coroutines of an event loop - which is bad. Libraries like aiofiles
use threads and synchronization to fake non-blocking I/O and events on file.
However, sockets do allow for non-blocking I/O - and their inherent latency makes it much more critical. When used in an event loop, waiting for data and retrying can be wrapped without blocking anything.
Similar to our AsyncRead
, we can define a suspend-and-read event for sockets. Instead of taking a file, we take a socket - which must be non-blocking. Also, our __await__
uses socket.recv
instead of file.read
.
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
Run Code Online (Sandbox Code Playgroud)
In contrast to AsyncRead
, __await__
performs truly non-blocking I/O. When data is available, it always reads. When no data is available, it always suspends. That means the event loop is only blocked while we perform useful work.
As far as the event loop is concerned, nothing changes much. The event to listen for is still the same as for files - a file descriptor marked ready by select
.
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
Run Code Online (Sandbox Code Playgroud)
At this point, it should be obvious that AsyncRead
and AsyncRecv
are the same kind of event. We could easily refactor them to be one event with an exchangeable I/O component. In effect, the event loop, coroutines and events cleanly separate a scheduler, arbitrary intermediate code and the actual I/O.
In principle, what you should do at this point is replicate the logic of read
as a recv
for AsyncRecv
. However, this is much more ugly now - you have to handle early returns when functions block inside the kernel, but yield control to you. For example, opening a connection versus opening a file is much longer:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
Run Code Online (Sandbox Code Playgroud)
Long story short, what remains is a few dozen lines of Exception handling. The events and event loop already work at this point.
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
Run Code Online (Sandbox Code Playgroud)
你的coro
desugaring在概念上是正确的,但稍微不完整.
await
不会无条件地暂停,但只有在遇到阻塞呼叫时才会暂停.它是如何知道呼叫阻塞的?这是由等待的代码决定的.例如,可以放弃对套接字读取的等待实现:
def read(sock, n):
# sock must be in non-blocking mode
try:
return sock.recv(n)
except EWOULDBLOCK:
event_loop.add_reader(sock.fileno, current_task())
return SUSPEND
Run Code Online (Sandbox Code Playgroud)
在真正的asyncio中,等效代码修改了a Future
而不是返回魔术值的状态,但概念是相同的.当适当地适应类似生成器的对象时,可以await
编辑上述代码.
在调用者方面,当你的协程包含:
data = await read(sock, 1024)
Run Code Online (Sandbox Code Playgroud)
它接近于:
data = read(sock, 1024)
if data is SUSPEND:
return SUSPEND
self.pos += 1
self.parts[self.pos](...)
Run Code Online (Sandbox Code Playgroud)
熟悉发电机的人倾向于用yield from
自动悬挂来描述上述内容.
挂起链一直持续到事件循环,它注意到协程被挂起,将它从可运行集中移除,然后继续执行可运行的协同程序(如果有的话).如果没有协同程序可运行,则循环等待,select()
直到协同程序感兴趣的文件描述符为IO做好准备.(事件循环维护文件描述符到协同程序的映射.)
在上面的示例中,一旦select()
告诉sock
可读的事件循环,它将重新添加coro
到runnable集,因此它将从暂停点继续.
换一种说法:
默认情况下,一切都发生在同一个线程中.
事件循环负责调度协同程序并在它们等待的任何内容(通常是通常会阻塞或超时的IO调用)准备就绪时将它们唤醒.
有关协程驱动事件循环的见解,我推荐Dave Beazley的演讲,他在现场观众面前演示从头开始编写事件循环.
Asyncio 代表异步输入输出,是指使用单线程或事件循环实现高并发的编程范式。异步编程是一种并行编程,其中允许工作单元与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败。
让我们看看下图:
让我们通过一个例子来理解 asyncio:
为了理解 asyncio 背后的概念,让我们考虑一个只有一个服务员的餐厅。突然,A、B、C 三个顾客出现了。他们三人在接到服务员的菜单后,决定吃什么的时间各不相同。
让我们假设 A 需要 5 分钟,B 10 分钟和 C 1 分钟来决定。如果单身服务员先从B开始,10分钟后接B的订单,然后他上菜A,花5分钟记下他的订单,最后花1分钟知道C想吃什么。因此,服务员总共花费 10 + 5 + 1 = 16 分钟来取消他们的订单。但是,请注意在这一系列事件中,C 在服务员找到他之前等待了 15 分钟,A 等待了 10 分钟,B 等待了 0 分钟。
现在考虑服务员是否知道每个顾客需要花费多少时间来做决定。他可以先从 C 开始,然后到 A,最后到 B。这样每个客户都会经历 0 分钟的等待。即使只有一个服务员,也会产生三个服务员的错觉,每个顾客都有一个服务员。
最后,服务员接受所有三个订单所需的总时间为 10 分钟,远少于其他场景中的 16 分钟。
让我们再看一个例子:
假设国际象棋大师马格努斯卡尔森举办了一个国际象棋展览,他与多个业余棋手下棋。他有两种方式进行展览:同步和异步。
假设:
同步:马格努斯卡尔森一次打一场比赛,永远不会同时打两场,直到比赛结束。每场比赛需要(55 + 5) * 30 == 1800秒,或30 分钟。整个展览需要24 * 30 == 720分钟,或12 小时。
异步:马格努斯卡尔森从一张桌子移动到另一张桌子,在每张桌子上移动一个。她离开桌子,让对手在等待时间内进行下一步。在所有 24 场比赛中,一步棋需要 Judit 24 * 5 == 120秒,或2 分钟。整个展览现在缩减到120 * 30 == 3600秒,或者仅仅1 小时
只有一个马格努斯卡尔森,他只有两只手,一次只能做一个动作。但是异步播放将展览时间从 12 小时缩短到 1 小时。
编码示例:
让我们尝试使用代码片段演示同步和异步执行时间。
异步 - async_count.py
import asyncio
import time
async def count():
print("One", end=" ")
await asyncio.sleep(1)
print("Two", end=" ")
await asyncio.sleep(2)
print("Three", end=" ")
async def main():
await asyncio.gather(count(), count(), count(), count(), count())
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Run Code Online (Sandbox Code Playgroud)
异步 - 输出:
One One One One One Two Two Two Two Two Three Three Three Three Three
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.
Run Code Online (Sandbox Code Playgroud)
同步 - sync_count.py
import time
def count():
print("One", end=" ")
time.sleep(1)
print("Two", end=" ")
time.sleep(2)
print("Three", end=" ")
def main():
for _ in range(5):
count()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Run Code Online (Sandbox Code Playgroud)
同步 - 输出:
One Two Three One Two Three One Two Three One Two Three One Two Three
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.
Run Code Online (Sandbox Code Playgroud)
Python生成器:
包含yield
语句的函数被编译为生成器。在函数体中使用 yield 表达式会使该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。创建的生成器对象自动接收一个__next()__
方法。回到上一节的例子,我们可以__next__
直接在生成器对象上调用,而不是使用next()
:
def asynchronous():
yield "Educative"
if __name__ == "__main__":
gen = asynchronous()
str = gen.__next__()
print(str)
Run Code Online (Sandbox Code Playgroud)
请记住以下有关生成器的信息:
next()
在生成器对象上调用以运行生成器函数内的代码。发电机的状态:
生成器经历以下状态:
GEN_CREATED
当生成器对象第一次从生成器函数返回并且迭代尚未开始时。GEN_RUNNING
当在生成器对象上调用 next 并由 python 解释器执行时。GEN_SUSPENDED
当发电机以产量暂停时GEN_CLOSED
当生成器完成执行或关闭时。生成器对象的方法:
生成器对象公开了不同的方法,可以调用这些方法来操作生成器。这些是:
throw()
send()
close()
asyncio 的规则:
async def
引入了本机协程或异步生成器。表达式async with
和async for
也是有效的。await
将函数控制传递回事件循环。(它暂停了周围协程的执行。)如果 Pythonawait f()
在 范围内遇到一个表达式g()
,这就是await
告诉事件循环的方式,“暂停执行g()
直到我正在等待的任何东西——结果f()
——返回。在同时,去让别的东西跑。”在代码中,第二个要点大致如下所示:
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r
Run Code Online (Sandbox Code Playgroud)
关于何时以及如何可以和不可以使用async
/也有一套严格的规则await
。无论您是仍在学习语法还是已经接触过async
/ ,这些都非常方便await
:
async def
是协程。它可以使用await
、return
或yield
,但所有这些都是可选的。声明async def noop(): pass
有效:
await
和/或return
创建协程函数。要调用一个协程函数,你必须await
得到它的结果。yield
在async def
块中使用较少见。这将创建一个异步生成器,您可以使用它进行迭代async for
。暂时忘记异步生成器,专注于了解协程函数的语法,它使用await
和/或return
.async def
都不能使用yield from
,这会引发SyntaxError
.SyntaxError
使用yield
之外的def
功能,它是一种SyntaxError
在使用await
外部的的async def
协同程序。您只能await
在协程的主体中使用。以下是一些简洁的示例,旨在总结上述几条规则:
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y
async def g(x):
yield x # OK - this is an async generator
async def m(x):
yield from gen(x) # NO - SyntaxError
def m(x):
y = await z(x) # NO - SyntaxError (no `async def` here)
return y
Run Code Online (Sandbox Code Playgroud)
Python 区分了 Python 生成器和用作协程的生成器。这些协程称为基于生成器的协程,需要将装饰器@asynio.coroutine
添加到函数定义中,尽管这并没有严格执行。
基于生成器的协程使用yield from
语法而不是yield
. 协程可以:
Python 中的协程使协作多任务成为可能。 协作多任务是运行进程自愿将 CPU 交给其他进程的方法。一个进程可能会在逻辑上被阻塞时这样做,比如在等待用户输入时或当它发起网络请求并且将空闲一段时间时。协程可以定义为一个特殊的函数,它可以在不丢失状态的情况下将控制权交给调用者。
那么协程和生成器有什么区别呢?
生成器本质上是迭代器,尽管它们看起来像函数。生成器和协程的区别,一般来说是:
我们可以编写的最简单的基于生成器的协程如下:
@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)
Run Code Online (Sandbox Code Playgroud)
协程休眠一秒钟。注意装饰器和yield from
.
原生是指语言引入了语法来专门定义协程,使它们成为语言中的一等公民。可以使用async/await
语法定义本机协程。我们可以编写的最简单的基于原生的协程如下:
async def do_something_important():
await asyncio.sleep(1)
Run Code Online (Sandbox Code Playgroud)
AsyncIO 带有它自己的一组可能的脚本设计,我们将在本节中讨论。
1. 事件循环
事件循环是一种编程结构,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击 UI 按钮或启动文件下载的进程。异步编程的核心是事件循环。
示例代码:
import asyncio
import random
import time
from threading import Thread
from threading import current_thread
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def do_something_important(sleep_for):
print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
await asyncio.sleep(sleep_for)
def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()
# set the event loop for the current thread
asyncio.set_event_loop(loop)
# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))
# remember to close the loop
loop.close()
if __name__ == "__main__":
thread_1 = Thread(target=launch_event_loops)
thread_2 = Thread(target=launch_event_loops)
start_time = time.perf_counter()
thread_1.start()
thread_2.start()
print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
thread_1.join()
thread_2.join()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)
执行命令: python async_event_loop.py
输出:
自己尝试并检查输出,您会发现每个生成的线程都在运行自己的事件循环。
事件循环的类型
有两种类型的事件循环:
2. 期货
Future 表示正在进行或将在未来安排的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆threading.Future
和asyncio.Future
。
示例代码:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
await asyncio.gather(foo(future), bar(future))
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)
执行命令: python async_futures.py
输出:
两个协程都传递了一个未来。在foo()
未来的等待着协程得到解决,而bar()
协程三秒钟后解决了未来。
3. 任务
任务就像期货,实际上,任务是未来的子类,可以使用以下方法创建:
asyncio.create_task()
接受协程并将它们包装为任务。loop.create_task()
只接受协程。asyncio.ensure_future()
接受期货、协程和任何可等待的对象。任务包装协程并在事件循环中运行它们。如果协程等待 Future,则任务会暂停协程的执行并等待 Future 完成。当 Future 完成后,被包装的协程将恢复执行。
示例代码:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))
await t2, t1
if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)
执行命令: python async_tasks.py
输出:
4. 链式协程:
协程的一个关键特性是它们可以链接在一起。一个协程对象是可等待的,所以另一个协程也await
可以。这允许您将程序分解为更小、可管理、可回收的协程:
示例代码:
import sys
import asyncio
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def function1(n: int) -> str:
i = random.randint(0, 10)
print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-1"
print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])
return result
async def function2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])
return result
async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await function1(n)
p2 = await function2(n, p1)
end = time.perf_counter() - start
print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])
async def main(*args):
await asyncio.gather(*(chain(n) for n in args))
if __name__ == "__main__":
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start_time = time.perf_counter()
asyncio.run(main(*args))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)
仔细注意输出,其中function1()
休眠的时间可变,并function2()
在结果可用时开始处理:
执行命令: python async_chained.py 11 8 5
输出:
5. 使用队列:
在此设计中,没有将任何个人消费者链接到生产者。消费者事先不知道生产者的数量,甚至不知道将添加到队列中的项目的累计数量。
单个生产者或消费者分别花费不同的时间从队列中放入和提取项目。队列充当吞吐量,可以与生产者和消费者进行通信,而无需他们直接相互交谈。
示例代码:
import asyncio
import argparse
import itertools as it
import os
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def generate_item(size: int = 5) -> str:
return os.urandom(size).hex()
async def random_sleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
async def produce(name: int, producer_queue: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await random_sleep(caller=f"Producer {name}")
i = await generate_item()
t = time.perf_counter()
await producer_queue.put((i, t))
print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:
while True:
await random_sleep(caller=f"Consumer {name}")
i, t = await consumer_queue.get()
now = time.perf_counter()
print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])
consumer_queue.task_done()
async def main(no_producer: int, no_consumer: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for consumer in consumers:
consumer.cancel()
if __name__ == "__main__":
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--no_producer", type=int, default=10)
parser.add_argument("-c", "--no_consumer", type=int, default=15)
ns = parser.parse_args()
start_time = time.perf_counter()
asyncio.run(main(**ns.__dict__))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)
执行命令: python async_queue.py -p 2 -c 4
输出:
最后,让我们举一个 asyncio 如何减少等待时间的例子:给定一个协程generate_random_int()
,它不断产生范围 [0, 10] 内的随机整数,直到其中一个超过阈值,你想让这个协程的多次调用不被需要等待对方依次完成。
示例代码:
import time
import asyncio
import random
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[35m", # Magenta
"\033[34m", # Blue
)
async def generate_random_int(indx: int, threshold: int = 5) -> int:
print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")
i = random.randint(0, 10)
while i <= threshold:
print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")
await asyncio.sleep(indx + 1)
i = random.randint(0, 10)
print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])
return i
async def main():
res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))
return res
if __name__ == "_
这一切都归结为 asyncio 正在解决的两个主要挑战:
第一点的答案已经存在了很长时间,称为选择循环。在python中,它是在选择器模块中实现的。
第二个问题与coroutine的概念有关,即可以停止执行并稍后恢复的函数。在 Python 中,协程是使用生成器和yield from语句实现的。这就是隐藏在async/await 语法后面的东西。
此答案中的更多资源。
编辑:解决您对 goroutines 的评论:
asyncio 中与 goroutine 最接近的等价物实际上不是协程而是任务(请参阅文档中的区别)。在 Python 中,协程(或生成器)对事件循环或 I/O 的概念一无所知。它只是一个可以yield
在保持当前状态的同时停止执行的函数,因此可以在以后恢复。该yield from
语法允许以透明的方式链接它们。
现在,在 asyncio 任务中,链最底部的协程总是最终产生一个future。这个未来然后冒泡到事件循环,并被集成到内部机制中。当未来被其他内部回调设置为完成时,事件循环可以通过将未来发送回协程链来恢复任务。
编辑:解决您帖子中的一些问题:
在这种情况下,I/O 实际上是如何发生的?在单独的线程中?整个解释器是否挂起并且 I/O 发生在解释器之外?
不,线程中什么也不会发生。I/O 总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,让你做一些肮脏的工作。
I/O 究竟是什么意思?如果我的 python 过程调用 C open() 过程,它又向内核发送中断,放弃对它的控制,那么 Python 解释器如何知道这一点并能够继续运行其他一些代码,而内核代码则执行实际的 I/ O 直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何意识到这种情况的发生?
I/O 是任何阻塞调用。在 asyncio 中,所有的 I/O 操作都应该经过事件循环,因为正如你所说,事件循环无法知道在某些同步代码中正在执行阻塞调用。这意味着您不应该open
在协程的上下文中使用同步。相反,请使用专用库,例如aiofiles,它提供open
.