asyncio实际上如何工作?

wvx*_*xvw 64 python python-3.x python-asyncio

这个问题是由我的另一个问题推动的:如何在cdef中等待?

网上有大量的文章和博客文章asyncio,但它们都非常肤浅.我找不到任何有关如何asyncio实际实现的信息,以及I/O异步的原因.我试图阅读源代码,但它是成千上万行不是最高级别的C代码,其中很多处理辅助对象,但最重要的是,很难在Python语法和它将翻译的C代码之间建立连接成.

Asycnio自己的文档甚至没那么有用.没有关于它是如何工作的信息,只有关于如何使用它的一些指导,这些指导有时也会误导/写得很差.

我熟悉Go的coroutines实现,并且希望Python做同样的事情.如果是这种情况,我在上面链接的帖子中出现的代码就可以了.既然没有,我现在正试图找出原因.到目前为止我最好的猜测如下,请纠正我错在哪里:

  1. 表单的过程定义async def foo(): ...实际上被解释为类继承的方法coroutine.
  2. 也许,async def实际上是通过await语句拆分成多个方法,其中调用这些方法的对象能够跟踪到目前为止通过执行所做的进度.
  3. 如果上述情况属实,那么,从本质上讲,协程的执行归结为一些全局管理器调用协程对象的方法(循环?).
  4. 全局管理器以某种方式(如何?)了解I/O操作何时由Python(仅?)代码执行,并且能够在当前执行方法放弃控制之后选择一个待执行的协程方法执行(命中await语句) ).

换句话说,这是我尝试将某些asyncio语法"贬低"为更容易理解的东西:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()
Run Code Online (Sandbox Code Playgroud)

如果我的猜测证明是正确的:那么我有一个问题.在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个翻译是否被暂停,I/O发生在翻译之外?I/O究竟是什么意思?如果我的python程序调用C open()程序,并且它又向内核发送中断,放弃对它的控制,Python解释器如何知道这一点并且能够继续运行其他代码,而内核代码执行实际的I/O,直到它唤醒了最初发送中断的Python程序?原则上Python解释器如何才能意识到这一点?

Bha*_*rel 119

asyncio如何工作?

在回答这个问题之前,我们需要了解一些基本术语,如果您已经了解其中任何一个,请跳过这些术语.

发电机

生成器是允许我们暂停执行python函数的对象.用户策划的生成器是使用关键字实现的yield.通过创建包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
Run Code Online (Sandbox Code Playgroud)

如您所见,调用next()生成器会导致解释器加载测试的帧,并返回yielded值.next()再次调用,导致帧再次加载到解释器堆栈中,并继续执行yield另一个值.

第三次next()调用时,我们的发电机完成了,StopIteration并被抛出.

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两种方法与它们进行通信:send()throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception
Run Code Online (Sandbox Code Playgroud)

在调用时gen.send(),该值将作为yield关键字的返回值传递.

gen.throw()另一方面,允许在生成器中抛出异常,yield并调用在同一位置引发的异常.

从发电机返回值

从生成器返回值会导致将值放入StopIteration异常中.我们以后可以从异常中恢复该值并将其用于我们的需要.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc
Run Code Online (Sandbox Code Playgroud)

看,一个新的关键字: yield from

Python 3.4附带了一个新关键字:yield from.该关键字允许我们做什么,是传递任何next(),send()throw()进入最内层嵌套的生成器.如果内部生成器返回一个值,它也是返回值yield from:

>>> def inner():
...     print((yield 2))
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print(val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen)
2
>>> gen.send("abc")
abc
3
4
Run Code Online (Sandbox Code Playgroud)

把它们放在一起

yield from在Python 3.4中引入new关键字后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层生成器传递到最外层的生成器.这为生成器产生了新的含义 - 协同程序.

协同程序是可以在运行时停止和恢复的功能.在Python中,它们是使用async def关键字定义的.就像发电机,他们也用自己的形式yield fromawait.在Python 3.5 之前async和之前await,我们创建了协同程序,其方式与创建生成器完全相同(yield from代替await).

async def inner():
    return 1

async def outer():
    await inner()
Run Code Online (Sandbox Code Playgroud)

与实现该__iter__()方法的每个迭代器或生成器一样,协程实现__await__()允许它们在每次await coro调用时继续.

你应该检查的Python文档中有一个很好的序列图.

在asyncio中,除了协程功能外,我们还有两个重要的对象:任务未来.

期货

期货是__await__()实施方法的对象,它们的工作是保持某种状态和结果.州可以是以下之一:

  1. PENDING - future没有任何结果或异常集.
  2. 取消 - 未来被取消使用 fut.cancel()
  3. 已完成 - 未来已完成,可以是使用结果集,也可以是使用fut.set_result()异常集fut.set_exception()

结果,就像你猜到的那样,可以是一个Python对象,将被返回,或者可能引发的异常.

对象的另一个重要特性future是它们包含一个名为的方法add_done_callback().此方法允许在任务完成后立即调用函数 - 无论是引发异常还是已完成.

任务

任务对象是特殊的未来,它包含协同程序,并与最内部和最外部的协同程序进行通信.每当协程成为await未来时,未来就会一直传递回任务(就像在yield from),任务就会收到它.

接下来,任务将自己与未来联系起来.它是通过呼唤add_done_callback()未来来实现的.从现在开始,如果未来将要完成,通过被取消,传递异常或传递Python对象作为结果,将调用任务的回调,并且它将恢复存在.

ASYNCIO

我们必须回答的最后一个问题是 - IO如何实施?

在asyncio内部,我们有一个事件循环.任务的事件循环.事件循环的工作是在每次准备就绪时调用任务,并将所有工作协调到一台工作机器中.

事件循环的IO部分建立在一个叫做的关键函数上select.Select是一个阻塞功能,由下面的操作系统实现,允许在套接字上等待传入或传出数据.收到数据后,它会唤醒,并返回接收数据的套接字或准备写入的套接字.

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的是首先检查套接字是否有任何可以立即读取或发送的数据.如果它的.send()缓冲区已满,或者.recv()缓冲区为空,则套接字将注册到该select函数(通过简单地将其添加到其中一个列表,rlistfor recvwlistfor send)以及相应的函数awaitsa新创建的future对象,绑定到该套接字.

当所有可用任务都在等待期货时,事件循环会调用select并等待.当其中一个套接字有传入数据,或者它的send缓冲区耗尽时,asyncio会检查绑定到该套接字的未来对象,并将其设置为完成.

现在所有的魔法都发生了.未来已经完成,之前自我增加的任务add_done_callback()恢复生机,并调用.send()协程恢复最内部协程(因为await链)并且你从附近的缓冲区读取新接收的数据它被泄露了.

方法链再次,在以下情况下recv():

  1. select.select 等待的时间.
  2. 准备好的套接字,返回数据.
  3. 来自套接字的数据被移动到缓冲区中.
  4. future.set_result() 叫做.
  5. 自我添加的任务add_done_callback()现在已被唤醒.
  6. 任务调用.send()协程,它一直进入最内部的协程并唤醒它.
  7. 正在从缓冲区读取数据并将其返回给我们不起作用的用户.

总之,asyncio使用生成器功能,允许暂停和恢复功能.它使用的yield from功能允许数据从最内层生成器传递到最外层.它使用所有这些来暂停函数执行,同时等待IO完成(通过使用OS select函数).

最棒的是什么?当一个功能暂停时,另一个功能可以运行并与精致的织物交错,这是asyncio.

  • 如果需要更多解释,请不要犹豫,发表评论.顺便说一下,我不完全确定我是不是应该把它写成博客文章或者stackoverflow中的答案.这个问题很难回答. (5认同)
  • 这篇文章有关于 Python 异步 I/O 主干的信息。谢谢你这么亲切的解释。 (4认同)
  • **Asyncio** 部分之前的内容可能是最关键的,因为它们是该语言实际自己做的唯一事情。``select`` 也可能符合条件,因为它是非阻塞 I/O 系统调用在 OS 上工作的方式。实际的 ``asyncio`` 构造和事件循环只是从这些东西构建的应用程序级代码。 (3认同)
  • 在异步套接字上,尝试发送或接收数据首先检查操作系统缓冲区。如果您尝试接收并且缓冲区中没有数据,则底层接收函数将返回一个错误值,该错误值将作为 Python 中的异常进行传播。与发送和完整缓冲区相同。当异常发生时,Python 会依次将这些套接字发送到 select 函数,该函数会挂起进程。但这不是 asyncio 的工作方式,而是 select 和套接字的工作方式,这也是高度特定于操作系统的。 (2认同)
  • @user8371915 总是在这里提供帮助 :-) 请记住,为了理解 Asyncio,您*必须*知道生成器、生成器通信和`yield from` 是如何工作的。然而,我确实注意到它是可以跳过的,以防读者已经知道它:-) 你认为我应该添加的其他内容吗? (2认同)
  • @Bharel 这篇文章本身非常有帮助,谢谢,但是当你开始使用套接字时,我对“Asyncio”有点迷失。事件循环本身是一个正在发送/接收东西的套接字,检查未来,如果没有发生它就等待?我看到了链式任务的漂亮图片,但是如何在事件循环本身中安排多个任务呢?是否有 FIFO/FILO/无论什么堆栈,新任务都会被扔到其中?当两个任务同时完成时,循环如何决定哪个先完成? (2认同)
  • @zonk 1.事件循环检查未来,如果没有发生,则在套接字上等待数据。2. 多个任务被放入充当 fifo 队列的内部列表中。只有准备好执行的任务才会被扔到那里。3. 每个周期执行所有就绪任务,并且就绪套接字唤醒其相关任务。如果任务等待套接字,则不会将其添加到队列中,并且一旦套接字准备就绪,事件循环就会将其添加回来。4.事件循环不保证顺序。此外,除了 2 个套接字之外,不能有 2 个任务同时完成(单线程)。 (2认同)
  • @zonk 多个任务像 fifo 队列一样处理。由于它是单线程,一次只能完成一个,因此顺序是已知和保证的。套接字接收到数据后恢复任务没有任何保证的顺序。(如果 2 个套接字同时从 select 返回,则顺序由操作系统选择)。 (2认同)
  • @Lore我简化了一点,再看一下这个例子。产量基本上进入另一个发电机,直到该发电机完成。 (2认同)
  • @AlexPovel 在“asyncio.sleep”期间,主线程不会被阻塞,并且可以自由地处理其他异步任务。例如,如果您要运行“asyncio.sleep(2)”的多个任务,它们将同时运行,总共需要 2 秒。另一方面,如果您要运行多个“time.sleep(2)”,它们不会并行运行,而只会一个接一个地运行,并且将花费“2*tasks”秒。 (2认同)
  • 我可以为这个答案提供我自己的积分“赏金”吗?我已经投了赞成票,但这是一个令人惊奇的答案,我想给出更多。:) (2认同)

Mis*_*agi 51

Talking about async/await and asyncio is not the same thing. The first is a fundamental, low-level construct (coroutines) while the later is a library using these constructs. Conversely, there is no single ultimate answer.

The following is a general description of how async/await and asyncio-like libraries work. That is, there may be other tricks on top (there are...) but they are inconsequential unless you build them yourself. The difference should be negligible unless you already know enough to not have to ask such a question.

1. Coroutines versus subroutines in a nut shell

Just like subroutines (functions, procedures, ...), coroutines (generators, ...) are an abstraction of call stack and instruction pointer: there is a stack of executing code pieces, and each is at a specific instruction.

The distinction of def versus async def is merely for clarity. The actual difference is return versus yield. From this, await or yield from take the difference from individual calls to entire stacks.

1.1. Subroutines

A subroutine represents a new stack level to hold local variables, and a single traversal of its instructions to reach an end. Consider a subroutine like this:

def subfoo(bar):
     qux = 3
     return qux * bar
Run Code Online (Sandbox Code Playgroud)

When you run it, that means

  1. allocate stack space for bar and qux
  2. recursively execute the first statement and jump to the next statement
  3. 一旦在a return,将其值推送到调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

值得注意的是,4表示子程序始终以相同的状态开始.完成后,功能本身独有的所有内容都将丢失.即使之后有指令,也无法恢复功能return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V
Run Code Online (Sandbox Code Playgroud)

1.2.协同程序作为持久子程序

协程就像一个子程序,但可以在破坏其状态的情况下退出.考虑这样的协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux
Run Code Online (Sandbox Code Playgroud)

当你运行它时,这意味着

  1. bar和分配堆栈空间qux
  2. 递归执行第一个语句并跳转到下一个语句
    1. 一旦处于a yield,将其值推送到调用堆栈,但存储堆栈和指令指针
    2. 一旦调用yield,恢复堆栈和指令指针并推送参数qux
  3. 一旦在a return,将其值推送到调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

注意添加2.1和2.2 - 协程可以在预定义的点暂停和恢复.这类似于在调用另一个子例程期间子例程的暂停方式.不同之处在于活动协程未严格绑定到其调用堆栈.相反,挂起的协程是一个单独的,孤立的堆栈的一部分.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :
Run Code Online (Sandbox Code Playgroud)

这意味着悬浮的协同程序可以在堆栈之间自由存储或移动.任何有权访问协程的调用堆栈都可以决定恢复它.

1.3.遍历调用堆栈

So far, our coroutine only goes down the call stack with yield. A subroutine can go down and up the call stack with return and (). For completeness, coroutines also need a mechanism to go up the call stack. Consider a coroutine like this:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'
Run Code Online (Sandbox Code Playgroud)

When you run it, that means it still allocates the stack and instruction pointer like a subroutine. When it suspends, that still is like storing a subroutine.

However, yield from does both. It suspends stack and instruction pointer of wrap and runs cofoo. Note that wrap stays suspended until cofoo finishes completely. Whenever cofoo suspends or something is sent, cofoo is directly connected to the calling stack.

1.4. Coroutines all the way down

As established, yield from allows to connect two scopes across another intermediate one. When applied recursively, that means the top of the stack can be connected to the bottom of the stack.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/
Run Code Online (Sandbox Code Playgroud)

Note that root and coro_b do not know about each other. This makes coroutines much cleaner than callbacks: coroutines still built on a 1:1 relation like subroutines. Coroutines suspend and resume their entire existing execution stack up until a regular call point.

Notably, root could have an arbitrary number of coroutines to resume. Yet, it can never resume more than one at the same time. Coroutines of the same root are concurrent but not parallel!

1.5. Python's async and await

The explanation has so far explicitly used the yield and yield from vocabulary of generators - the underlying functionality is the same. The new Python3.5 syntax async and await exists mainly for clarity.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None
Run Code Online (Sandbox Code Playgroud)

The async for and async with statements are needed because you would break the yield from/await chain with the bare for and with statements.

2. Anatomy of a simple event loop

By itself, a coroutine has no concept of yielding control to another coroutine. It can only yield control to the caller at the bottom of a coroutine stack. This caller can then switch to another coroutine and run it.

This root node of several coroutines is commonly an event loop: on suspension, a coroutine yields an event on which it wants resume. In turn, the event loop is capable of efficiently waiting for these events to occur. This allows it to decide which coroutine to run next, or how to wait before resuming.

Such a design implies that there is a set of pre-defined events that the loop understands. Several coroutines await each other, until finally an event is awaited. This event can communicate directly with the event loop by yielding control.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/
Run Code Online (Sandbox Code Playgroud)

The key is that coroutine suspension allows the event loop and events to directly communicate. The intermediate coroutine stack does not require any knowledge about which loop is running it, nor how events work.

2.1.1. Events in time

The simplest event to handle is reaching a point in time. This is a fundamental block of threaded code as well: a thread repeatedly sleeps until a condition is true. However, a regular sleep blocks execution by itself - we want other coroutines to not be blocked. Instead, we want tell the event loop when it should resume the current coroutine stack.

2.1.2. Defining an Event

An event is simply a value we can identify - be it via an enum, a type or other identity. We can define this with a simple class that stores our target time. In addition to storing the event information, we can allow to await a class directly.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
Run Code Online (Sandbox Code Playgroud)

This class only stores the event - it does not say how to actually handle it.

The only special feature is __await__ - it is what the await keyword looks for. Practically, it is an iterator but not available for the regular iteration machinery.

2.2.1. Awaiting an event

Now that we have an event, how do coroutines react to it? We should be able to express the equivalent of sleep by awaiting our event. To better see what is going on, we wait twice for half the time:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)
Run Code Online (Sandbox Code Playgroud)

We can directly instantiate and run this coroutine. Similar to a generator, using coroutine.send runs the coroutine until it yields a result.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)
Run Code Online (Sandbox Code Playgroud)

This gives us two AsyncSleep events and then a StopIteration when the coroutine is done. Notice that the only delay is from time.sleep in the loop! Each AsyncSleep only stores an offset from the current time.

2.2.2. Event + Sleep

At this point, we have two separate mechanisms at our disposal:

  • AsyncSleep Events that can be yielded from inside a coroutine
  • time.sleep that can wait without impacting coroutines

Notably, these two are orthogonal: neither one affects or triggers the other. As a result, we can come up with our own strategy to sleep to meet the delay of an AsyncSleep.

2.3. A naive event loop

If we have several coroutines, each can tell us when it wants to be woken up. We can then wait until the first of them wants to be resumed, then for the one after, and so on. Notably, at each point we only care about which one is next.

This makes for a straightforward scheduling:

  1. sort coroutines by their desired wake up time
  2. pick the first that wants to wake up
  3. wait until this point in time
  4. run this coroutine
  5. repeat from 1.

A trivial implementation does not need any advanced concepts. A list allows to sort coroutines by date. Waiting is a regular time.sleep. Running coroutines works just like before with coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
Run Code Online (Sandbox Code Playgroud)

Of course, this has ample room for improvement. We can use a heap for the wait queue or a dispatch table for events. We could also fetch return values from the StopIteration and assign them to the coroutine. However, the fundamental principle remains the same.

2.4. Cooperative Waiting

The AsyncSleep event and run event loop are a fully working implementation of timed events.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))
Run Code Online (Sandbox Code Playgroud)

This cooperatively switches between each of the five coroutines, suspending each for 0.1 seconds. Even though the event loop is synchronous, it still executes the work in 0.5 seconds instead of 2.5 seconds. Each coroutine holds state and acts independently.

3. I/O event loop

An event loop that supports sleep is suitable for polling. However, waiting for I/O on a file handle can be done more efficiently: the operating system implements I/O and thus knows which handles are ready. Ideally, an event loop should support an explicit "ready for I/O" event.

3.1. The select call

Python already has an interface to query the OS for read I/O handles. When called with handles to read or write, it returns the handles ready to read or write:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
Run Code Online (Sandbox Code Playgroud)

For example, we can open a file for writing and wait for it to be ready:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])
Run Code Online (Sandbox Code Playgroud)

Once select returns, writeable contains our open file.

3.2. Basic I/O event

Similar to the AsyncSleep request, we need to define an event for I/O. With the underlying select logic, the event must refer to a readable object - say an open file. In addition, we store how much data to read.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )
Run Code Online (Sandbox Code Playgroud)

As with AsyncSleep we mostly just store the data required for the underlying system call. This time, __await__ is capable of being resumed multiple times - until our desired amount has been read. In addition, we return the I/O result instead of just resuming.

3.3. Augmenting an event loop with read I/O

The basis for our event loop is still the run defined previously. First, we need to track the read requests. This is no longer a sorted schedule, we only map read requests to coroutines.

# new
waiting_read = {}  # type: Dict[file, coroutine]
Run Code Online (Sandbox Code Playgroud)

Since select.select takes a timeout parameter, we can use it in place of time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])
Run Code Online (Sandbox Code Playgroud)

This gives us all readable files - if there are any, we run the corresponding coroutine. If there are none, we have waited long enough for our current coroutine to run.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]
Run Code Online (Sandbox Code Playgroud)

Finally, we have to actually listen for read requests.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...
Run Code Online (Sandbox Code Playgroud)

3.4. Putting it together

The above was a bit of a simplification. We need to do some switching to not starve sleeping coroutines if we can always read. We need to handle having nothing to read or nothing to wait for. However, the end result still fits into 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine
Run Code Online (Sandbox Code Playgroud)

3.5. Cooperative I/O

The AsyncSleep, AsyncRead and run implementations are now fully functional to sleep and/or read. Same as for sleepy, we can define a helper to test reading:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))
Run Code Online (Sandbox Code Playgroud)

Running this, we can see that our I/O is interleaved with the waiting task:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
Run Code Online (Sandbox Code Playgroud)

4. Non-Blocking I/O

While I/O on files gets the concept across, it is not really suitable for a library like asyncio: the select call always returns for files, and both open and read may block indefinitely. This blocks all coroutines of an event loop - which is bad. Libraries like aiofiles use threads and synchronization to fake non-blocking I/O and events on file.

However, sockets do allow for non-blocking I/O - and their inherent latency makes it much more critical. When used in an event loop, waiting for data and retrying can be wrapped without blocking anything.

4.1. Non-Blocking I/O event

Similar to our AsyncRead, we can define a suspend-and-read event for sockets. Instead of taking a file, we take a socket - which must be non-blocking. Also, our __await__ uses socket.recv instead of file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )
Run Code Online (Sandbox Code Playgroud)

In contrast to AsyncRead, __await__ performs truly non-blocking I/O. When data is available, it always reads. When no data is available, it always suspends. That means the event loop is only blocked while we perform useful work.

4.2. Un-Blocking the event loop

As far as the event loop is concerned, nothing changes much. The event to listen for is still the same as for files - a file descriptor marked ready by select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine
Run Code Online (Sandbox Code Playgroud)

At this point, it should be obvious that AsyncRead and AsyncRecv are the same kind of event. We could easily refactor them to be one event with an exchangeable I/O component. In effect, the event loop, coroutines and events cleanly separate a scheduler, arbitrary intermediate code and the actual I/O.

4.3. The ugly side of non-blocking I/O

In principle, what you should do at this point is replicate the logic of read as a recv for AsyncRecv. However, this is much more ugly now - you have to handle early returns when functions block inside the kernel, but yield control to you. For example, opening a connection versus opening a file is much longer:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass
Run Code Online (Sandbox Code Playgroud)

Long story short, what remains is a few dozen lines of Exception handling. The events and event loop already work at this point.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
Run Code Online (Sandbox Code Playgroud)

Addendum

Example code at github


use*_*342 8

你的corodesugaring在概念上是正确的,但稍微不完整.

await不会无条件地暂停,但只有在遇到阻塞呼叫时才会暂停.它是如何知道呼叫阻塞的?这是由等待的代码决定的.例如,可以放弃对套接字读取的等待实现:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND
Run Code Online (Sandbox Code Playgroud)

在真正的asyncio中,等效代码修改了a Future而不是返回魔术值的状态,但概念是相同的.当适当地适应类似生成器的对象时,可以await编辑上述代码.

在调用者方面,当你的协程包含:

data = await read(sock, 1024)
Run Code Online (Sandbox Code Playgroud)

它接近于:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)
Run Code Online (Sandbox Code Playgroud)

熟悉发电机的人倾向于用yield from自动悬挂来描述上述内容.

挂起链一直持续到事件循环,它注意到协程被挂起,将它从可运行集中移除,然后继续执行可运行的协同程序(如果有的话).如果没有协同程序可运行,则循环等待,select()直到协同程序感兴趣的文件描述符为IO做好准备.(事件循环维护文件描述符到协同程序的映射.)

在上面的示例中,一旦select()告诉sock可读的事件循环,它将重新添加coro到runnable集,因此它将从暂停点继续.

换一种说法:

  1. 默认情况下,一切都发生在同一个线程中.

  2. 事件循环负责调度协同程序并在它们等待的任何内容(通常是通常会阻塞或超时的IO调用)准备就绪时将它们唤醒.

有关协程驱动事件循环的见解,我推荐Dave Beazley的演讲,他在现场观众面前演示从头开始编写事件循环.

  • @wvxvw我尽可能多地回答你发布的问题,尽管只有最后一段包含六个问题,我才能做到这一点.所以我们继续 - 这不是'wait_for`*没有做它应该做的*(它确实,它是你应该等待的协程),这是你的期望与系统的设计不符并实施了.我认为如果事件循环在一个单独的线程中运行,你的问题可以与asyncio相匹配,但我不知道你的用例的细节,老实说,你的态度并不能帮助你. (8认同)
  • @wvxvw`我对asyncio的沮丧部分是由于底层概念非常简单,例如,Emacs Lisp实现了很长时间,没有使用流行语......" - 没有什么能阻止你实现这个简单的概念没有Python的流行语然后:)你为什么要使用这个丑陋的asyncio?从头开始实现自己的.例如,您可以从创建自己的`async.wait_for()`函数开始,该函数完全按照预期执行. (5认同)

Sab*_*bil 7

什么是异步?

Asyncio 代表异步输入输出,是指使用单线程或事件循环实现高并发的编程范式。异步编程是一种并行编程,其中允许工作单元与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败。

让我们看看下图:

异步流

让我们通过一个例子来理解 asyncio:

为了理解 asyncio 背后的概念,让我们考虑一个只有一个服务员的餐厅。突然,A、B、C 三个顾客出现了。他们三人在接到服务员的菜单后,决定吃什么的时间各不相同。

让我们假设 A 需要 5 分钟,B 10 分钟和 C 1 分钟来决定。如果单身服务员先从B开始,10分钟后接B的订单,然后他上菜A,花5分钟记下他的订单,最后花1分钟知道C想吃什么。因此,服务员总共花费 10 + 5 + 1 = 16 分钟来取消他们的订单。但是,请注意在这一系列事件中,C 在服务员找到他之前等待了 15 分钟,A 等待了 10 分钟,B 等待了 0 分钟。

现在考虑服务员是否知道每个顾客需要花费多少时间来做决定。他可以先从 C 开始,然后到 A,最后到 B。这样每个客户都会经历 0 分钟的等待。即使只有一个服务员,也会产生三个服务员的错觉,每个顾客都有一个服务员。

最后,服务员接受所有三个订单所需的总时间为 10 分钟,远少于其他场景中的 16 分钟。

让我们再看一个例子:

假设国际象棋大师马格努斯卡尔森举办了一个国际象棋展览,他与多个业余棋手下棋。他有两种方式进行展览:同步和异步。

假设:

  • 24个对手
  • 马格努斯卡尔森让每一个国际象棋在 5 秒内移动
  • 对手每人需要 55 秒来采取行动
  • 游戏平均 30 对动作(总共 60 动作)

同步:马格努斯卡尔森一次打一场比赛,永远不会同时打两场,直到比赛结束。每场比赛需要(55 + 5) * 30 == 1800秒,或30 分钟。整个展览需要24 * 30 == 720分钟,或12 小时

异步:马格努斯卡尔森从一张桌子移动到另一张桌子,在每张桌子上移动一个。她离开桌子,让对手在等待时间内进行下一步。在所有 24 场比赛中,一步棋需要 Judit 24 * 5 == 120秒,或2 分钟。整个展览现在缩减到120 * 30 == 3600秒,或者仅仅1 小时

只有一个马格努斯卡尔森,他只有两只手,一次只能做一个动作。但是异步播放将展览时间从 12 小时缩短到 1 小时。

编码示例:

让我们尝试使用代码片段演示同步和异步执行时间。

异步 - async_count.py

import asyncio  
import time  
  
  
async def count():  
    print("One", end=" ")  
    await asyncio.sleep(1)  
    print("Two", end=" ")  
    await asyncio.sleep(2)  
    print("Three", end=" ")  
  
  
async def main():  
    await asyncio.gather(count(), count(), count(), count(), count())  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Run Code Online (Sandbox Code Playgroud)

异步 - 输出

One One One One One Two Two Two Two Two Three Three Three Three Three 
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.
Run Code Online (Sandbox Code Playgroud)

同步 - sync_count.py

import time  
  
  
def count():  
    print("One", end=" ")  
    time.sleep(1)  
    print("Two", end=" ")  
    time.sleep(2)  
    print("Three", end=" ")  
  
  
def main():  
    for _ in range(5):  
        count()  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    main()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Run Code Online (Sandbox Code Playgroud)

同步 - 输出

One Two Three One Two Three One Two Three One Two Three One Two Three 
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.
Run Code Online (Sandbox Code Playgroud)

为什么在 Python 中使用 asyncio 而不是多线程?

  • 编写线程安全的代码非常困难。使用异步代码,您可以准确地知道代码将从一个任务转移到下一个任务的位置,并且竞争条件更难出现。
  • 线程消耗大量数据,因为每个线程都需要有自己的堆栈。使用异步代码,所有代码共享相同的堆栈,并且由于在任务之间不断展开堆栈,堆栈保持较小。
  • 线程是操作系统结构,因此需要更多内存供平台支持。异步任务不存在这样的问题。

异步是如何工作的?

在深入之前让我们回忆一下 Python Generator

Python生成器:

包含yield语句的函数被编译为生成器。在函数体中使用 yield 表达式会使该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。创建的生成器对象自动接收一个__next()__方法。回到上一节的例子,我们可以__next__直接在生成器对象上调用,而不是使用next()

def asynchronous():
    yield "Educative"


if __name__ == "__main__":
    gen = asynchronous()

    str = gen.__next__()
    print(str)
Run Code Online (Sandbox Code Playgroud)

请记住以下有关生成器的信息:

  • 生成器函数允许您拖延计算昂贵的值。您仅在需要时计算下一个值。这使得生成器的内存和计算效率更高;他们避免在内存中保存长序列或预先进行所有昂贵的计算。
  • 生成器在挂起时保留代码位置,即最后执行的 yield 语句,以及它们的整个本地范围。这允许他们从他们停止的地方恢复执行。
  • 生成器对象只不过是迭代器。
  • 记住要区分生成器函数和关联的生成器对象,它们通常可以互换使用。生成器函数在调用时返回一个生成器对象,并next()在生成器对象上调用以运行生成器函数内的代码。

发电机的状态:

生成器经历以下状态:

  • GEN_CREATED 当生成器对象第一次从生成器函数返回并且迭代尚未开始时。
  • GEN_RUNNING 当在生成器对象上调用 next 并由 python 解释器执行时。
  • GEN_SUSPENDED 当发电机以产量暂停时
  • GEN_CLOSED 当生成器完成执行或关闭时。

generator_cycle

生成器对象的方法:

生成器对象公开了不同的方法,可以调用这些方法来操作生成器。这些是:

  • throw()
  • send()
  • close()

让我们深入了解更多细节解释

asyncio 的规则:

  • 该语法async def引入了本机协程异步生成器。表达式async withasync for也是有效的。
  • 关键字await将函数控制传递回事件循环。(它暂停了周围协程的执行。)如果 Pythonawait f()在 范围内遇到一个表达式g(),这就是await告诉事件循环的方式,“暂停执行g()直到我正在等待的任何东西——结果f()——返回。在同时,去让别的东西跑。”

在代码中,第二个要点大致如下所示:

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r
Run Code Online (Sandbox Code Playgroud)

关于何时以及如何可以和不可以使用async/也有一套严格的规则await。无论您是仍在学习语法还是已经接触过async/ ,这些都非常方便await

  • 您引入的函数async def是协程。它可以使用awaitreturnyield,但所有这些都是可选的。声明async def noop(): pass有效:
    • 使用await和/或return创建协程函数。要调用一个协程函数,你必须await得到它的结果。
    • yieldasync def块中使用较少见。这将创建一个异步生成器,您可以使用它进行迭代async for。暂时忘记异步生成器,专注于了解协程函数的语法,它使用await和/或return.
    • 任何用 with 定义的东西async def都不能使用yield from,这会引发SyntaxError.
  • 就像它是一个SyntaxError使用yield之外的def功能,它是一种SyntaxError在使用await外部的的async def协同程序。您只能await在协程的主体中使用。

以下是一些简洁的示例,旨在总结上述几条规则:

async def f(x):
    y = await z(x)     # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x            # OK - this is an async generator

async def m(x):
    yield from gen(x)  # NO - SyntaxError

def m(x):
    y = await z(x)     # NO - SyntaxError (no `async def` here)
    return y
Run Code Online (Sandbox Code Playgroud)

基于生成器的协程

Python 区分了 Python 生成器和用作协程的生成器。这些协程称为基于生成器的协程,需要将装饰器@asynio.coroutine添加到函数定义中,尽管这并没有严格执行。

基于生成器的协程使用yield from语法而不是yield. 协程可以:

  • 从另一个协程产生
  • 未来收益
  • 返回一个表达式
  • 引发异常

Python 中的协程使协作多任务成为可能。 协作多任务是运行进程自愿将 CPU 交给其他进程的方法。一个进程可能会在逻辑上被阻塞时这样做,比如在等待用户输入时或当它发起网络请求并且将空闲一段时间时。协程可以定义为一个特殊的函数,它可以在不丢失状态的情况下将控制权交给调用者。

那么协程和生成器有什么区别呢?

生成器本质上是迭代器,尽管它们看起来像函数。生成器和协程的区别,一般来说是:

  • 生成器将一个值返回给调用者,而协程将控制权交给另一个协程,并且可以从它放弃控制权的那一点恢复执行。
  • 生成器一旦启动就不能接受参数,而协程可以。
  • 生成器主要用于简化编写迭代器。它们是一种协程,有时也称为半协程。

基于生成器的协程示例

我们可以编写的最简单的基于生成器的协程如下:

@asyncio.coroutine
def do_something_important():
    yield from asyncio.sleep(1)
Run Code Online (Sandbox Code Playgroud)

协程休眠一秒钟。注意装饰器和yield from.

基于本机的协程示例

原生是指语言引入了语法来专门定义协程,使它们成为语言中的一等公民。可以使用async/await语法定义本机协程。我们可以编写的最简单的基于原生的协程如下:

async def do_something_important():
    await asyncio.sleep(1)
Run Code Online (Sandbox Code Playgroud)

异步IO设计模式

AsyncIO 带有它自己的一组可能的脚本设计,我们将在本节中讨论。

1. 事件循环

事件循环是一种编程结构,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击 UI 按钮或启动文件下载的进程。异步编程的核心是事件循环。

示例代码:

import asyncio  
import random  
import time  
from threading import Thread  
from threading import current_thread  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def do_something_important(sleep_for):  
    print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
    await asyncio.sleep(sleep_for)  
  
  
def launch_event_loops():  
    # get a new event loop  
  loop = asyncio.new_event_loop()  
  
    # set the event loop for the current thread  
  asyncio.set_event_loop(loop)  
  
    # run a coroutine on the event loop  
  loop.run_until_complete(do_something_important(random.randint(1, 5)))  
  
    # remember to close the loop  
  loop.close()  
  
  
if __name__ == "__main__":  
    thread_1 = Thread(target=launch_event_loops)  
    thread_2 = Thread(target=launch_event_loops)  
  
    start_time = time.perf_counter()  
    thread_1.start()  
    thread_2.start()  
  
    print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
  
    thread_1.join()  
    thread_2.join()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)

执行命令: python async_event_loop.py

输出:

异步事件循环

自己尝试并检查输出,您会发现每个生成的线程都在运行自己的事件循环。

事件循环的类型

有两种类型的事件循环:

  • SelectorEventLoop : SelectorEventLoop 基于 selectors 模块,是所有平台上的默认循环。
  • ProactorEventLoop:ProactorEventLoop 基于 Windows 的 I/O 完成端口,仅在 Windows 上受支持。

2. 期货

Future 表示正在进行或将在未来安排的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆threading.Futureasyncio.Future

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
    await asyncio.gather(foo(future), bar(future))  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)

执行命令: python async_futures.py

输出:

异步期货

两个协程都传递了一个未来。在foo()未来的等待着协程得到解决,而bar()协程三秒钟后解决了未来。

3. 任务

任务就像期货,实际上,任务是未来的子类,可以使用以下方法创建:

  • asyncio.create_task() 接受协程并将它们包装为任务。
  • loop.create_task() 只接受协程。
  • asyncio.ensure_future() 接受期货、协程和任何可等待的对象。

任务包装协程并在事件循环中运行它们。如果协程等待 Future,则任务会暂停协程的执行并等待 Future 完成。当 Future 完成后,被包装的协程将恢复执行。

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
  
    loop = asyncio.get_event_loop()  
    t1 = loop.create_task(bar(future))  
    t2 = loop.create_task(foo(future))  
  
    await t2, t1  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)

执行命令: python async_tasks.py

输出:

异步任务

4. 链式协程:

协程的一个关键特性是它们可以链接在一起。一个协程对象是可等待的,所以另一个协程也await可以。这允许您将程序分解为更小、可管理、可回收的协程:

示例代码:

import sys  
import asyncio  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def function1(n: int) -> str:  
    i = random.randint(0, 10)  
    print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-1"  
  print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])  
    return result  
  
  
async def function2(n: int, arg: str) -> str:  
    i = random.randint(0, 10)  
    print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-2 derived from {arg}"  
  print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])  
    return result  
  
  
async def chain(n: int) -> None:  
    start = time.perf_counter()  
    p1 = await function1(n)  
    p2 = await function2(n, p1)  
    end = time.perf_counter() - start  
    print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])  
  
  
async def main(*args):  
    await asyncio.gather(*(chain(n) for n in args))  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])  
    start_time = time.perf_counter()  
    asyncio.run(main(*args))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)

仔细注意输出,其中function1()休眠的时间可变,并function2()在结果可用时开始处理:

执行命令: python async_chained.py 11 8 5

输出:

异步链式

5. 使用队列:

在此设计中,没有将任何个人消费者链接到生产者。消费者事先不知道生产者的数量,甚至不知道将添加到队列中的项目的累计数量。

单个生产者或消费者分别花费不同的时间从队列中放入和提取项目。队列充当吞吐量,可以与生产者和消费者进行通信,而无需他们直接相互交谈。

示例代码:

import asyncio  
import argparse  
import itertools as it  
import os  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def generate_item(size: int = 5) -> str:  
    return os.urandom(size).hex()  
  
  
async def random_sleep(caller=None) -> None:  
    i = random.randint(0, 10)  
    if caller:  
        print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
  
  
async def produce(name: int, producer_queue: asyncio.Queue) -> None:  
    n = random.randint(0, 10)  
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer  
  await random_sleep(caller=f"Producer {name}")  
        i = await generate_item()  
        t = time.perf_counter()  
        await producer_queue.put((i, t))  
        print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])  
  
  
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:  
    while True:  
        await random_sleep(caller=f"Consumer {name}")  
        i, t = await consumer_queue.get()  
        now = time.perf_counter()  
        print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])  
        consumer_queue.task_done()  
  
  
async def main(no_producer: int, no_consumer: int):  
    q = asyncio.Queue()  
    producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]  
    consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]  
    await asyncio.gather(*producers)  
    await q.join()  # Implicitly awaits consumers, too  
  for consumer in consumers:  
        consumer.cancel()  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-p", "--no_producer", type=int, default=10)  
    parser.add_argument("-c", "--no_consumer", type=int, default=15)  
    ns = parser.parse_args()  
    start_time = time.perf_counter()  
    asyncio.run(main(**ns.__dict__))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Run Code Online (Sandbox Code Playgroud)

执行命令: python async_queue.py -p 2 -c 4

输出:

异步队列

最后,让我们举一个 asyncio 如何减少等待时间的例子:给定一个协程generate_random_int(),它不断产生范围 [0, 10] 内的随机整数,直到其中一个超过阈值,你想让这个协程的多次调用不被需要等待对方依次完成。

示例代码:

import time  
import asyncio  
import random  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[35m",  # Magenta  
  "\033[34m",  # Blue  
)  
  
  
async def generate_random_int(indx: int, threshold: int = 5) -> int:  
    print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")  
    i = random.randint(0, 10)  
    while i <= threshold:  
        print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")  
        await asyncio.sleep(indx + 1)  
        i = random.randint(0, 10)  
    print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])  
    return i  
  
  
async def main():  
    res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))  
    return res  
  
  
if __name__ == "_


Vin*_*ent 5

这一切都归结为 asyncio 正在解决的两个主要挑战:

  • 如何在单个线程中执行多个 I/O?
  • 如何实现协同多任务?

第一点的答案已经存在了很长时间,称为选择循环。在python中,它是在选择器模块中实现的。

第二个问题与coroutine的概念有关,即可以停止执行并稍后恢复的函数。在 Python 中,协程是使用生成器yield from语句实现的。这就是隐藏在async/await 语法后面的东西。

答案中的更多资源。


编辑:解决您对 goroutines 的评论:

asyncio 中与 goroutine 最接近的等价物实际上不是协程而是任务(请参阅文档中的区别)。在 Python 中,协程(或生成器)对事件循环或 I/O 的概念一无所知。它只是一个可以yield在保持当前状态的同时停止执行的函数,因此可以在以后恢复。该yield from语法允许以透明的方式链接它们。

现在,在 asyncio 任务中,链最底部的协程总是最终产生一个future。这个未来然后冒泡到事件循环,并被集成到内部机制中。当未来被其他内部回调设置为完成时,事件循环可以通过将未来发送回协程链来恢复任务。


编辑:解决您帖子中的一些问题:

在这种情况下,I/O 实际上是如何发生的?在单独的线程中?整个解释器是否挂起并且 I/O 发生在解释器之外?

不,线程中什么也不会发生。I/O 总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,让你做一些肮脏的工作。

I/O 究竟是什么意思?如果我的 python 过程调用 C open() 过程,它又向内核发送中断,放弃对它的控制,那么 Python 解释器如何知道这一点并能够继续运行其他一些代码,而内核代码则执行实际的 I/ O 直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何意识到这种情况的发生?

I/O 是任何阻塞调用。在 asyncio 中,所有的 I/O 操作都应该经过事件循环,因为正如你所说,事件循环无法知道在某些同步代码中正在执行阻塞调用。这意味着您不应该open在协程的上下文中使用同步。相反,请使用专用库,例如aiofiles,它提供open.

  • @wvxvw 我不同意。这些不是“流行语”,而是已在许多图书馆中实施的高级概念。例如,一个 asyncio 任务、一个 gevent greenlet 和一个 goroutine 都对应同一个东西:一个可以在单个线程中并发运行的执行单元。此外,我认为根本不需要 C 来理解 asyncio,除非您想了解 python 生成器的内部工作原理。 (9认同)