Python 3.4中的多处理与多线程与asyncio

use*_*650 68 python multithreading multiprocessing python-3.x python-asyncio

我发现在Python 3.4中,很少有用于多处理/线程的不同库:多处理线程asyncio.

但我不知道使用哪一个或是"推荐的".他们做同样的事情,还是不同?如果是这样,哪一个用于什么?我想编写一个在我的计算机中使用多核的程序.但我不知道我应该学习哪个图书馆.

use*_*253 54

它们旨在用于(略微)不同的目的和/或要求.CPython(典型的主线Python实现)仍然具有全局解释器锁,因此多线程应用程序(现在实现并行处理的标准方法)不是最理想的.这就是为什么multiprocessing 可能更喜欢的原因threading.但并非每个问题都可以有效地分解为[几乎独立的]部分,因此可能需要进行繁重的进程间通信.这就是为什么multiprocessing通常不会优先考虑的原因threading.

asyncio(这种技术不仅可以在Python中使用,其他语言和/或框架也可以使用它,例如Boost.ASIO)是一种从许多同时来源有效处理大量I/O操作的方法,无需并行代码执行.所以它只是一个特定任务的解决方案(确实是一个好的!),而不是一般的并行处理.

  • 注意到虽然所有三个都可能无法实现并行性,但它们都能够执行并发(非阻塞)任务. (5认同)

Sim*_*Art 43

许多答案都建议如何仅选择 1 个选项,但为什么不能使用所有 3 个选项呢?在这个答案中,我解释了如何使用asyncio来管理组合所有 3 种并发形式,以及如何在以后需要时轻松地在它们之间进行交换

简短的回答


许多初次接触 Python 并发的开发人员最终都会使用processing.Processthreading.Thread。然而,这些是低级 API,已由模块提供的高级 API 合并在一起concurrent.futures。此外,生成进程和线程会产生开销,例如需要更多内存,这一问题困扰着我下面展示的示例之一。在某种程度上,concurrent.futures它会为您管理这一点,这样您就不能轻松地执行诸如生成一千个进程之类的操作,并通过仅生成几个进程然后在每次完成时重新使用这些进程来使计算机崩溃。

这些高级 API 通过 提供,然后由和concurrent.futures.Executor实现。在大多数情况下,您应该使用它们而不是和,因为将来使用时更容易从一种更改为另一种,并且不必了解每种的详细差异。concurrent.futures.ProcessPoolExecutorconcurrent.futures.ThreadPoolExecutormultiprocessing.Processthreading.Threadconcurrent.futures

由于它们共享统一的接口,您还会发现代码使用multiprocessingthreading经常使用concurrent.futures. asyncio对此也不例外,并提供了通过以下代码使用它的方法:

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

事实上,事实证明,使用threadingwithasyncio非常常见,以至于在 Python 3.9 中,他们添加asyncio.to_thread(func, *args, **kwargs)了默认的 来缩短它ThreadPoolExecutor

长答案


这种方法有什么缺点吗?

是的。对于asyncio,最大的缺点是异步函数与同步函数不同。如果您没有从一开始就考虑到编程,这可能会给asyncio很多新用户带来麻烦,并导致大量返工。asyncio

另一个缺点是您的代码的用户也将被迫使用asyncio. 所有这些必要的返工往往会让初次使用的asyncio用户感到非常酸涩。

这有什么非性能优势吗?

是的。类似于 usingconcurrent.futures优于threading.Thread其统一接口的方式,这种方法可以被认为是从异步函数到异步函数的multiprocessing.Process进一步抽象。Executor您可以开始使用asyncio,如果稍后您找到需要的部分threadingmultiprocessing,则可以使用asyncio.to_threadrun_in_executor。同样,您稍后可能会发现您尝试使用线程运行的异步版本已经存在,因此您可以轻松地退出使用threading并切换到asyncio该版本。

这有什么性能优势吗?

是的……也不是。最终这取决于任务。在某些情况下,它可能没有帮助(尽管可能不会造成伤害),而在其他情况下,它可能会有很大帮助。这个答案的其余部分提供了一些关于为什么使用asyncio运行 anExecutor可能是有利的解释。

- 组合多个执行器和其他异步代码

asyncio本质上提供了对并发性的更多控制,但代价是您需要更多地控制并发性。如果您想使用 a来同时运行一些代码ThreadPoolExecutor以及其他一些代码ProcessPoolExecutor,那么使用同步代码来管理它并不那么容易,但是使用asyncio.

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

async def with_processing():
    with ProcessPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def with_threading():
    with ThreadPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def main():
    await asyncio.gather(with_processing(), with_threading())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

这是如何运作的?本质上asyncio是要求执行者运行他们的函数。然后,当执行器运行时,asyncio将运行其他代码。例如,ProcessPoolExecutor启动一堆进程,然后在等待这些进程完成时ThreadPoolExecutor启动一堆线程。asyncio然后将检查这些执行者并在完成后收集他们的结果。此外,如果您有其他代码使用asyncio,您可以在等待进程和线程完成时运行它们。

- 缩小代码的哪些部分需要执行器

在代码中拥有许多执行器的情况并不常见,但是当人们使用线程/进程时,我看到的一个常见问题是他们会将整个代码推入一个线程/进程中,期望它能够工作。例如,我曾经看到过以下代码(大约):

from concurrent.futures import ThreadPoolExecutor
import requests

def get_data(url):
    return requests.get(url).json()["data"]

urls = [...]

with ThreadPoolExecutor() as executor:
    for data in executor.map(get_data, urls):
        print(data)
Run Code Online (Sandbox Code Playgroud)

这段代码的有趣之处在于,使用并发时它比不使用并发时要慢。为什么?因为结果json很大,而且有很多线程消耗大量内存是灾难性的。幸运的是,解决方案很简单:

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [...]

with ThreadPoolExecutor() as executor:
    for response in executor.map(requests.get, urls):
        print(response.json()["data"])
Run Code Online (Sandbox Code Playgroud)

json现在一次只有一个被卸载到内存中,一切都很好。

教训在这里?

您不应该尝试将所有代码放入线程/进程中,而应该关注代码的哪些部分实际上需要并发。

get_data如果函数不像本例那么简单怎么办?如果我们必须在函数中间深处的某个地方应用执行器怎么办?这就是asyncio出现的地方:

import asyncio
import requests

async def get_data(url):
    # A lot of code.
    ...
    # The specific part that needs threading.
    response = await asyncio.to_thread(requests.get, url, some_other_params)
    # A lot of code.
    ...
    return data

urls = [...]

async def main():
    tasks = [get_data(url) for url in urls]
    for task in asyncio.as_completed(tasks):
        data = await task
        print(data)

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

尝试同样的做法concurrent.futures绝不是一件好事。您可以使用回调、队列等,但它比基本代码更难管理asyncio

  • (续)在后一种情况下,这可能会导致立即加载大量内存(“.json()”的大小乘以线程数),这对性能可能是灾难性的。使用 `asyncio`,您可以轻松地挑选哪些代码使用 `threading` 运行,哪些代码不运行,从而允许您选择不使用 `threading` 运行 `.json()["data"]`,而是使用 `threading` 运行`.json()["data"]`一次只加载一个。 (3认同)
  • @ZacWrangler 这里的过程有两个重要的组成部分:`requests.get(...)` 和 `.json()["data"]`。一个执行 API 请求,另一个将所需数据加载到内存中。将“线程”应用于 API 请求可能会显着提高性能,因为您的计算机没有为其执行任何工作,它只是在等待下载内容。将 `threading` 应用于 `.json()["data"]` 可能(并且可能会)导致多个 `.json()` 同时启动,并且*最终*后跟 `[" data"]`,也许是在所有 `.json()` 运行之后。 (2认同)

Ben*_*ari 36

[快速回答]

TL; DR


做出正确的选择:

我们已经了解了最流行的并发形式.但问题仍然存在 - 何时应该选择哪一个?这实际上取决于用例.根据我的经验(和阅读),我倾向于遵循这个伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")
Run Code Online (Sandbox Code Playgroud)
  • CPU Bound =>多处理
  • I/O绑定,快速I/O,有限的连接数=>多线程
  • I/O绑定,慢速I/O,许多连接=> Asyncio

参考


[ 注意 ]:

  • 如果你有一个长调用方法(即与睡眠时间包含的方法),最好的选择是asyncioasyncioasyncio方法(协程法),用一个单独的线程并发性的作品.
  • asyncio适用于Python3.
  • uvloop是超快asyncio事件循环(uvloop使得asyncio2-4x更快).

  • @mingchau,是的,但请记住,当您使用可等待函数时,您可以使用“asyncio”,“request”库不是可等待方法,而是可以使用 [`aiohttp` 库]( https://pypi.org/project/aiohttp-requests/) 或 [async-request](https://pypi.org/project/requests-async/) 等。 (5认同)
  • 请扩展slowIO和fastIO以进行多线程或异步>? (5认同)
  • 请您告知 io_very_slow 到底是什么 (5认同)
  • @variable I/O 绑定意味着您的程序大部分时间都花在与慢速设备通信上,例如网络连接、硬盘驱动器、打印机或具有睡眠时间的事件循环。因此,在阻塞模式下,您可以在线程或异步之间进行选择,如果您的边界部分非常慢,则协作多任务处理(异步)是更好的选择(即避免资源匮乏、死锁和竞争条件) (4认同)
  • 因此,如果我有要请求的 url 列表,最好使用 *Asyncio*? (3认同)

Tom*_*iak 24

多处理中,您可以利用多个 CPU 来分配您的计算。由于每个 CPU 并行运行,因此您可以有效地同时运行多个任务。您可能希望对CPU 密集型任务使用多处理。一个例子是试图计算一个巨大列表的所有元素的总和。如果您的机器有 8 个内核,您可以将列表“切割”成 8 个较小的列表,并在单独的内核上分别计算每个列表的总和,然后将这些数字相加。通过这样做,您将获得约 8 倍的加速。

在(多)线程中您不需要多个 CPU。想象一个向 Web 发送大量 HTTP 请求的程序。如果您使用单线程程序,它会在每个请求处停止执行(阻塞),等待响应,然后在收到响应后继续。这里的问题是您的 CPU 在等待某些外部服务器完成工作时并没有真正工作;在此期间,它实际上可以做一些有用的工作!解决方法是使用线程——您可以创建许多线程,每个线程负责从网络请求一些内容。线程的好处在于,即使它们在一个 CPU 上运行,CPU 也会不时“冻结”一个线程的执行并跳转到另一个线程的执行(这称为上下文切换,它在不确定的情况下不断发生)间隔)。 - 使用线程。

asyncio本质上是线程处理,而不是 CPU 而是您作为程序员(或实际上您的应用程序)决定上下文切换发生的位置和时间。在 Python 中,您使用await关键字来暂停协程的执行(使用async关键字定义)。

  • 我喜欢你提到开发人员在“异步”中控制上下文切换,但操作系统在“线程”中控制它 (21认同)

Far*_*eed 22

这是基本思想:

IO绑定吗?---------> 使用asyncio

它是CPU- HEAVY 吗?-----> 使用multiprocessing

别的 ?--------------> 使用threading

所以基本上坚持线程,除非你有 IO/CPU 问题。

  • 您可能遇到的第三个问题是什么? (14认同)
  • @EralpB 不受 io 或 CPU 限制,就像线程工作线程执行简单计算或在本地或从快速本地数据库读取数据块一样。或者只是睡觉和看东西。基本上,大多数问题都属于这个标准,除非您有网络应用程序或繁重的计算。 (3认同)

fjs*_*fjs 10

我\xe2\x80\x99m不是专业的Python用户,但作为计算机体系结构的学生,我想我可以分享一下我在多处理和多线程之间进行选择时的一些考虑因素。此外,其他一些答案(即使是那些得票较高的答案)也滥用了技术术语,所以我认为\xe2\x80\x99s也有必要对这些问题做出一些澄清,我\xe2\x80\x99ll会这样做首先。

\n

多处理和多线程之间的根本区别在于它们是否共享相同的内存空间。线程共享对同一虚拟内存空间的访问,因此线程可以高效且轻松地交换计算结果(零复制,完全在用户空间执行)。

\n

另一方面,进程具有单独的虚拟内存空间。它们无法直接读取或写入其他进程\xe2\x80\x99内存空间,就像一个人在不与另一个人交谈的情况下无法读取或改变另一个人的想法一样。(允许这样做会违反内存保护并破坏使用虚拟内存的目的。)为了在进程之间交换数据,它们必须依赖操作系统\xe2\x80\x99s设施(例如消息传递),并且更多这是比线程使用的 \xe2\x80\x9c 共享内存\xe2\x80\x9d 方案成本更高的原因之一。原因一是调用OS\xe2\x80\x99消息传递机制需要进行系统调用,将代码执行从用户态切换到内核态,比较耗时;另一个原因可能是操作系统消息传递方案必须将数据字节从发送者\xe2\x80\x99内存空间复制到接收者\xe2\x80\x99内存空间,因此复制成本非零。

\n

说多线程程序只能使用一个CPU是不正确的。很多人之所以这么说,是因为 CPython 实现的一个神器:全局解释器锁(GIL)。由于 GIL,CPython 进程中的线程是序列化的。结果,看起来多线程python程序只使用了一个CPU。

\n

但多线程计算机程序一般并不局限于一个核心,而对于 Python 来说,不使用 GIL 的实现确实可以并行运行多个线程,即同时在多个 CPU 上运行。(参见https://wiki.python.org/moin/GlobalInterpreterLock)。

\n

鉴于 CPython 是 Python 的主要实现,因此可以理解为什么多线程 Python 程序通常等同于绑定到单核。

\n

对于带有 GIL 的 Python,释放多核威力的唯一方法是使用多处理(也有例外,如下所述)。但是你的问题最好能够轻松地划分为相互通信最少的并行子问题,否则将不得不发生大量进程间通信,如上所述,使用 OS\xe2\x80\x99 消息的开销传递机制的成本将会很高,有时成本如此之高完全抵消了并行处理的好处。如果您的问题的本质需要并发例程之间的频繁通信,那么多线程是自然的选择。不幸的是,对于 CPython,由于 GIL,真正有效的并行多线程是不可能的。在这种情况下,您应该意识到 Python 并不是您项目的最佳工具,并考虑使用其他语言。

\n

有\xe2\x80\x99s一种替代解决方案,即在用C(或其他语言)编写的外部库中实现并发处理例程,并将该模块导入Python。CPython GIL 不会阻塞该外部库生成的线程。

\n

那么,有了 GIL 的负担,CPython 中的多线程有什么好处吗?不过,正如其他答案所提到的,如果您\xe2\x80\x99正在进行 IO 或网络通信,它仍然提供好处。在这些情况下,相关计算不是由您的 CPU 完成,而是由其他设备完成(在 IO 的情况下,磁盘控制器和 DMA(直接内存访问)控制器将以最少的 CPU 参与传输数据;在网络的情况下, NIC(网络接口卡)和 DMA 将在没有 CPU\xe2\x80\x99s 参与的情况下处理大部分任务,因此一旦线程将此类任务委托给 NIC 或磁盘控制器,操作系统就可以将该线程分配给睡眠状态并切换到同一程序的其他线程来做有用的工作。

\n

在我的理解中,asyncio模块本质上是IO操作多线程的一个特例。

\n

因此:\nCPU 密集型程序可以轻松分区以在通信有限的多个进程上运行:如果 GIL 不存在(例如 Jython),则使用多线程,或者如果 GIL 存在(例如 CPython),则使用多进程。

\n

CPU 密集型程序,需要并发例程之间进行密集通信:如果 GIL 不存在,则使用多线程,或者使用其他编程语言。

\n

IO 批次\xe2\x80\x99s:asyncio

\n


Chr*_*sen 9

已经有很多好的答案了。无法详细说明何时使用每一种。这是两者的更有趣的组合。多处理+异步:https://pypi.org/project/aiomultiprocess/

它设计的用例是 highio,但仍然利用尽可能多的可用内核。Facebook 使用这个库编写了某种基于 python 的文件服务器。Asyncio 允许 IO 绑定流量,但多处理允许多个事件循环和多核上的线程。

存储库中的 Ex 代码:

import asyncio
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
    async with request("GET", url) as response:
        return await response.text("utf-8")

async def main():
    urls = ["https://jreese.sh", ...]
    async with Pool() as pool:
        async for result in pool.map(get, urls):
            ...  # process result
            
if __name__ == '__main__':
    # Python 3.7
    asyncio.run(main())
    
    # Python 3.6
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

只是这里的补充,在 jupyter 笔记本中不能很好地工作,因为笔记本已经运行了 asyncio 循环。只是提醒您不要拔掉头发。


小智 8

  • 多处理可以并行运行。

  • 多线程asyncio不能并行运行。

使用Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz32.0 GB RAM ,我计算了2 个进程2 个线程2 个异步任务2之间100000的素数数量,如下所示。*这是CPU密集型计算

多重处理 多线程 异步
23.87秒 45.24秒 44.77秒

因为多处理可以并行运行,所以多处理比多线程异步快一倍,如上所示。

我使用了下面3组代码:

多重处理:

# "process_test.py"

from multiprocessing import Process
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()

    print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds
Run Code Online (Sandbox Code Playgroud)

结果:

...
9592
9592
23.87 seconds
Run Code Online (Sandbox Code Playgroud)

多线程:

# "thread_test.py"

from threading import Thread
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

thread_list = []

for _ in range(0, 2): # 2 threads
    thread = Thread(target=test)
    thread_list.append(thread)
    
for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds
Run Code Online (Sandbox Code Playgroud)

结果:

...
9592
9592
45.24 seconds
Run Code Online (Sandbox Code Playgroud)

异步:

# "asyncio_test.py"

import asyncio
import time
start_time = time.time()

async def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(test())

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds
Run Code Online (Sandbox Code Playgroud)

结果:

...
9592
9592
44.77 seconds
Run Code Online (Sandbox Code Playgroud)