在Python中,是否存在与多处理或concurrent.futures等效的异步?

sta*_*tti 33 python asynchronous

基本上,我正在寻找一些提供并行映射的东西,使用python3协同程序作为后端而不是线程或进程.我相信在执行高度并行的IO工作时应该减少开销.

当然,类似的东西已经存在,无论是标准库还是一些广泛使用的包?

Yar*_*min 34

免责声明 PEP 0492仅定义协同程序的语法和用法.它们需要运行事件循环,这很可能asyncio是事件循环.

异步映射

我不知道map基于协同程序的任何实现.但是,map使用asyncio.gather()以下方法实现基本功能是微不足道的:

def async_map(coroutine_func, iterable):
    loop = asyncio.get_event_loop()
    future = asyncio.gather(*(coroutine_func(param) for param in iterable))
    return loop.run_until_complete(future)
Run Code Online (Sandbox Code Playgroud)

这个实现非常简单.它为每个项目创建一个协程iterable,将它们连接成单个协程并在事件循环上执行连接协程.

提供的实施涵盖了部分案例.但它有一个问题.使用long iterable,您可能希望限制并行运行的协同程序的数量.我无法想出简单的实现,这是有效的并且同时保留了顺序,因此我将其留作读者的练习.

性能

你声称:

我相信在执行高度并行的IO工作时应该减少开销.

它需要证明,所以这里是一个比较multiprocessing实施,gevent由实施AP和我实现了基于协同程序.所有测试都在Python 3.5上进行.

实施使用multiprocessing:

from multiprocessing import Pool
import time


def async_map(f, iterable):
    with Pool(len(iterable)) as p:  # run one process per item to measure overhead only
        return p.map(f, iterable)

def func(val):
    time.sleep(1)
    return val * val
Run Code Online (Sandbox Code Playgroud)

实施使用gevent:

import gevent
from gevent.pool import Group


def async_map(f, iterable):
    group = Group()
    return group.map(f, iterable)

def func(val):
    gevent.sleep(1)
    return val * val
Run Code Online (Sandbox Code Playgroud)

实施使用asyncio:

import asyncio


def async_map(f, iterable):
    loop = asyncio.get_event_loop()
    future = asyncio.gather(*(f(param) for param in iterable))
    return loop.run_until_complete(future)

async def func(val):
    await asyncio.sleep(1)
    return val * val
Run Code Online (Sandbox Code Playgroud)

测试程序通常是timeit:

$ python3 -m timeit -s 'from perf.map_mp import async_map, func' -n 1 'async_map(func, list(range(10)))'
Run Code Online (Sandbox Code Playgroud)

结果:

  1. 可重复的10项目:

    • multiprocessing - 1.05秒
    • gevent - 1秒
    • asyncio - 1秒
  2. 可重复的100项目:

    • multiprocessing - 1.16秒
    • gevent - 1.01秒
    • asyncio - 1.01秒
  3. 可重复的500项目:

    • multiprocessing - 2.31秒
    • gevent - 1.02秒
    • asyncio - 1.03秒
  4. 可重复的5000项目:

    • multiprocessing- 失败(产生5k进程并不是一个好主意!)
    • gevent - 1.12秒
    • asyncio - 1.22秒
  5. 可重复的50000项目:

    • gevent - 2.2秒
    • asyncio - 3.25秒

结论

当程序主要执行I/O而不是计算时,基于事件循环的并发工作更快.请记住,当I/O较少且涉及的计算量较多时,差异会更小.

产生进程引入的开销比基于事件循环的并发引入的开销要大得多.这意味着您的假设是正确的.

比较asyncio而且gevent我们可以说,它的asyncio开销要大33-45%.这意味着创建greenlet比创建协同程序更便宜.

作为最终结论:gevent具有更好的性能,但是asyncio是标准库的一部分.性能差异(绝对数字)不是很显着.gevent是一个相当成熟的图书馆,虽然asyncio相对较新,但它进展很快.

  • 您可以使用asyncio.Semaphore来限制并发请求的数量:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html (2认同)