如何使用python的asyncio模块正确创建和运行并发任务?

son*_*olo 65 python concurrency task python-3.4 python-asyncio

我正在尝试Task使用Python 3的相对较新的asyncio模块正确理解和实现两个并发运行的对象.

简而言之,asyncio似乎旨在Task通过事件循环处理异步进程和并发执行.它促进了await(在异步函数中应用)作为无回调方式的使用,等待和使用结果,而不会阻塞事件循环.(期货和回调仍然是一个可行的选择.)

它还提供了asyncio.Task()类,一个专门Future 用于包装协同程序的子类.优选地通过使用该asyncio.ensure_future()方法来调用.asyncio任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务"同时"运行.我的理解是Tasks连接到事件循环,然后自动保持在await语句之间驱动协程.

我喜欢能够使用并发任务而不需要使用其中一个Executor类的想法,但我没有找到很多关于实现的详细说明.

这就是我目前正在做的事情:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

在尝试同时运行两个循环任务的情况下,我注意到除非Task有一个内部await表达式,否则它会卡在while循环中,有效地阻止其他任务运行(很像正常while循环).然而,一旦任务必须(a)等待,它们似乎同时运行而没有问题.

因此,这些await语句似乎为事件循环提供了在任务之间来回切换的立足点,从而产生了并发效果.

内部示例输出await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2
Run Code Online (Sandbox Code Playgroud)

没有内部的示例输出await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4
Run Code Online (Sandbox Code Playgroud)

问题

此实现是否通过"并行"循环任务的"正确"示例asyncio

它是否正确,这是唯一的方法是为了Task提供一个阻塞点(await表达式),以便事件循环兼顾多个任务?

dan*_*ano 73

是的,任何在你的事件循环中运行的协程都会阻止其他协同程序和任务运行,除非它

  1. 使用yield from或调用另一个协程await(如果使用Python 3.5+).
  2. 回报.

这是因为asyncio是单线程的; 事件循环运行的唯一方法是没有其他协同程序正在执行.临时使用yield from/ await暂停协程,为事件循环提供工作机会.

您的示例代码很好,但在许多情况下,您可能不希望长时间运行的代码没有在事件循环中运行异步I/O开始.在这些情况下,使用BaseEventLoop.run_in_executor在后台线程或进程中运行代码通常更有意义.ProcessPoolExecutor如果您的任务受CPU限制,ThreadPoolExecutor那么将是更好的选择,如果您需要执行一些asyncio不友好的I/O,则会使用它.

例如,您的两个循环完全受CPU限制,并且不共享任何状态,因此最佳性能来自于使用ProcessPoolExecutor跨CPU并行运行每个循环:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
    baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))

    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

  • @shongololo对不起,修好了.应该使用`asyncio.async`,而不是直接使用`asyncio.Task`构造函数.我们不希望`say_boo`和`say_baa`是协同程序,它们应该只是在事件循环之外运行的普通函数,所以你不应该在它们中添加`yield from`调用或将它们包装在`asyncio中.Task`. (2认同)

zky*_*ony 19

函数asyncio.ensure_futureasyncio.get_event_loop在 Python 3.10 中已弃用。

您可以通过以下方式同时say_boo运行两个协程:say_baaasyncio.create_task

async def main():
    boo = asyncio.create_task(say_boo())
    baa = asyncio.create_task(say_baa())
    await boo
    await baa

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

您还可以使用asyncio.gather

async def main():
    await asyncio.gather(say_boo(), say_baa())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

  • 为了获得对此问题的新观点,考虑到 API 的最新更改,我将此问题标记为正确。 (2认同)

Jas*_*ohi 13

您不一定需要yield from x控制事件循环.

在你的例子中,我认为正确的方法是做一个yield None或等效的简单yield,而不是yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

协同程序只是普通的Python生成器.在内部,asyncio事件循环保留这些生成器的记录,并gen.send()在永不停止的循环中逐个调用它们.无论何时yield,gen.send()完成调用和循环都可以继续.(我正在简化它;看看https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265获取实际代码)

也就是说,run_in_executor如果你需要在不共享数据的情况下进行CPU密集型计算,我仍会继续这条路线.

  • 从Python 3.5开始,*right*方法是使用`asyncio.sleep(0)`.[见这个讨论.](https://github.com/python/asyncio/issues/284) (19认同)