use*_*650 68 python multithreading multiprocessing python-3.x python-asyncio
我发现在Python 3.4中,很少有用于多处理/线程的不同库:多处理与线程和asyncio.
但我不知道使用哪一个或是"推荐的".他们做同样的事情,还是不同?如果是这样,哪一个用于什么?我想编写一个在我的计算机中使用多核的程序.但我不知道我应该学习哪个图书馆.
use*_*253 54
它们旨在用于(略微)不同的目的和/或要求.CPython(典型的主线Python实现)仍然具有全局解释器锁,因此多线程应用程序(现在实现并行处理的标准方法)不是最理想的.这就是为什么multiprocessing
可能更喜欢的原因threading
.但并非每个问题都可以有效地分解为[几乎独立的]部分,因此可能需要进行繁重的进程间通信.这就是为什么multiprocessing
通常不会优先考虑的原因threading
.
asyncio
(这种技术不仅可以在Python中使用,其他语言和/或框架也可以使用它,例如Boost.ASIO)是一种从许多同时来源有效处理大量I/O操作的方法,无需并行代码执行.所以它只是一个特定任务的解决方案(确实是一个好的!),而不是一般的并行处理.
Sim*_*Art 43
许多答案都建议如何仅选择 1 个选项,但为什么不能使用所有 3 个选项呢?在这个答案中,我解释了如何使用asyncio
来管理组合所有 3 种并发形式,以及如何在以后需要时轻松地在它们之间进行交换。
许多初次接触 Python 并发的开发人员最终都会使用processing.Process
和threading.Thread
。然而,这些是低级 API,已由模块提供的高级 API 合并在一起concurrent.futures
。此外,生成进程和线程会产生开销,例如需要更多内存,这一问题困扰着我下面展示的示例之一。在某种程度上,concurrent.futures
它会为您管理这一点,这样您就不能轻松地执行诸如生成一千个进程之类的操作,并通过仅生成几个进程然后在每次完成时重新使用这些进程来使计算机崩溃。
这些高级 API 通过 提供,然后由和concurrent.futures.Executor
实现。在大多数情况下,您应该使用它们而不是和,因为将来使用时更容易从一种更改为另一种,并且不必了解每种的详细差异。concurrent.futures.ProcessPoolExecutor
concurrent.futures.ThreadPoolExecutor
multiprocessing.Process
threading.Thread
concurrent.futures
由于它们共享统一的接口,您还会发现代码使用multiprocessing
或threading
经常使用concurrent.futures
. asyncio
对此也不例外,并提供了通过以下代码使用它的方法:
import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar
T = TypeVar("T")
async def run_in_executor(
executor: Optional[Executor],
func: Callable[..., T],
/,
*args: Any,
**kwargs: Any,
) -> T:
"""
Run `func(*args, **kwargs)` asynchronously, using an executor.
If the executor is None, use the default ThreadPoolExecutor.
"""
return await asyncio.get_running_loop().run_in_executor(
executor,
partial(func, *args, **kwargs),
)
# Example usage for running `print` in a thread.
async def main():
await run_in_executor(None, print, "O" * 100_000)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
事实上,事实证明,使用threading
withasyncio
非常常见,以至于在 Python 3.9 中,他们添加asyncio.to_thread(func, *args, **kwargs)
了默认的 来缩短它ThreadPoolExecutor
。
是的。对于asyncio
,最大的缺点是异步函数与同步函数不同。如果您没有从一开始就考虑到编程,这可能会给asyncio
很多新用户带来麻烦,并导致大量返工。asyncio
另一个缺点是您的代码的用户也将被迫使用asyncio
. 所有这些必要的返工往往会让初次使用的asyncio
用户感到非常酸涩。
是的。类似于 usingconcurrent.futures
优于threading.Thread
其统一接口的方式,这种方法可以被认为是从异步函数到异步函数的multiprocessing.Process
进一步抽象。Executor
您可以开始使用asyncio
,如果稍后您找到需要的部分threading
或multiprocessing
,则可以使用asyncio.to_thread
或run_in_executor
。同样,您稍后可能会发现您尝试使用线程运行的异步版本已经存在,因此您可以轻松地退出使用threading
并切换到asyncio
该版本。
是的……也不是。最终这取决于任务。在某些情况下,它可能没有帮助(尽管可能不会造成伤害),而在其他情况下,它可能会有很大帮助。这个答案的其余部分提供了一些关于为什么使用asyncio
运行 anExecutor
可能是有利的解释。
asyncio
本质上提供了对并发性的更多控制,但代价是您需要更多地控制并发性。如果您想使用 a来同时运行一些代码ThreadPoolExecutor
以及其他一些代码ProcessPoolExecutor
,那么使用同步代码来管理它并不那么容易,但是使用asyncio
.
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
async def with_processing():
with ProcessPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def with_threading():
with ThreadPoolExecutor() as executor:
tasks = [...]
for task in asyncio.as_completed(tasks):
result = await task
...
async def main():
await asyncio.gather(with_processing(), with_threading())
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
这是如何运作的?本质上asyncio
是要求执行者运行他们的函数。然后,当执行器运行时,asyncio
将运行其他代码。例如,ProcessPoolExecutor
启动一堆进程,然后在等待这些进程完成时ThreadPoolExecutor
启动一堆线程。asyncio
然后将检查这些执行者并在完成后收集他们的结果。此外,如果您有其他代码使用asyncio
,您可以在等待进程和线程完成时运行它们。
在代码中拥有许多执行器的情况并不常见,但是当人们使用线程/进程时,我看到的一个常见问题是他们会将整个代码推入一个线程/进程中,期望它能够工作。例如,我曾经看到过以下代码(大约):
from concurrent.futures import ThreadPoolExecutor
import requests
def get_data(url):
return requests.get(url).json()["data"]
urls = [...]
with ThreadPoolExecutor() as executor:
for data in executor.map(get_data, urls):
print(data)
Run Code Online (Sandbox Code Playgroud)
这段代码的有趣之处在于,使用并发时它比不使用并发时要慢。为什么?因为结果json
很大,而且有很多线程消耗大量内存是灾难性的。幸运的是,解决方案很简单:
from concurrent.futures import ThreadPoolExecutor
import requests
urls = [...]
with ThreadPoolExecutor() as executor:
for response in executor.map(requests.get, urls):
print(response.json()["data"])
Run Code Online (Sandbox Code Playgroud)
json
现在一次只有一个被卸载到内存中,一切都很好。
教训在这里?
您不应该尝试将所有代码放入线程/进程中,而应该关注代码的哪些部分实际上需要并发。
但get_data
如果函数不像本例那么简单怎么办?如果我们必须在函数中间深处的某个地方应用执行器怎么办?这就是asyncio
出现的地方:
import asyncio
import requests
async def get_data(url):
# A lot of code.
...
# The specific part that needs threading.
response = await asyncio.to_thread(requests.get, url, some_other_params)
# A lot of code.
...
return data
urls = [...]
async def main():
tasks = [get_data(url) for url in urls]
for task in asyncio.as_completed(tasks):
data = await task
print(data)
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
尝试同样的做法concurrent.futures
绝不是一件好事。您可以使用回调、队列等,但它比基本代码更难管理asyncio
。
Ben*_*ari 36
我们已经了解了最流行的并发形式.但问题仍然存在 - 何时应该选择哪一个?这实际上取决于用例.根据我的经验(和阅读),我倾向于遵循这个伪代码:
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Threads")
else:
print("Multi Processing")
Run Code Online (Sandbox Code Playgroud)
- CPU Bound =>多处理
- I/O绑定,快速I/O,有限的连接数=>多线程
- I/O绑定,慢速I/O,许多连接=> Asyncio
[ 注意 ]:
asyncio
或asyncio
或asyncio
方法(协程法),用一个单独的线程并发性的作品.asyncio
适用于Python3.asyncio
事件循环(uvloop使得asyncio
2-4x更快).Tom*_*iak 24
在多处理中,您可以利用多个 CPU 来分配您的计算。由于每个 CPU 并行运行,因此您可以有效地同时运行多个任务。您可能希望对CPU 密集型任务使用多处理。一个例子是试图计算一个巨大列表的所有元素的总和。如果您的机器有 8 个内核,您可以将列表“切割”成 8 个较小的列表,并在单独的内核上分别计算每个列表的总和,然后将这些数字相加。通过这样做,您将获得约 8 倍的加速。
在(多)线程中您不需要多个 CPU。想象一个向 Web 发送大量 HTTP 请求的程序。如果您使用单线程程序,它会在每个请求处停止执行(阻塞),等待响应,然后在收到响应后继续。这里的问题是您的 CPU 在等待某些外部服务器完成工作时并没有真正工作;在此期间,它实际上可以做一些有用的工作!解决方法是使用线程——您可以创建许多线程,每个线程负责从网络请求一些内容。线程的好处在于,即使它们在一个 CPU 上运行,CPU 也会不时“冻结”一个线程的执行并跳转到另一个线程的执行(这称为上下文切换,它在不确定的情况下不断发生)间隔)。 - 使用线程。
asyncio本质上是线程处理,而不是 CPU 而是您作为程序员(或实际上您的应用程序)决定上下文切换发生的位置和时间。在 Python 中,您使用await
关键字来暂停协程的执行(使用async
关键字定义)。
Far*_*eed 22
这是基本思想:
是IO绑定吗?---------> 使用
asyncio
它是CPU- HEAVY 吗?-----> 使用
multiprocessing
别的 ?--------------> 使用
threading
所以基本上坚持线程,除非你有 IO/CPU 问题。
fjs*_*fjs 10
我\xe2\x80\x99m不是专业的Python用户,但作为计算机体系结构的学生,我想我可以分享一下我在多处理和多线程之间进行选择时的一些考虑因素。此外,其他一些答案(即使是那些得票较高的答案)也滥用了技术术语,所以我认为\xe2\x80\x99s也有必要对这些问题做出一些澄清,我\xe2\x80\x99ll会这样做首先。
\n多处理和多线程之间的根本区别在于它们是否共享相同的内存空间。线程共享对同一虚拟内存空间的访问,因此线程可以高效且轻松地交换计算结果(零复制,完全在用户空间执行)。
\n另一方面,进程具有单独的虚拟内存空间。它们无法直接读取或写入其他进程\xe2\x80\x99内存空间,就像一个人在不与另一个人交谈的情况下无法读取或改变另一个人的想法一样。(允许这样做会违反内存保护并破坏使用虚拟内存的目的。)为了在进程之间交换数据,它们必须依赖操作系统\xe2\x80\x99s设施(例如消息传递),并且更多这是比线程使用的 \xe2\x80\x9c 共享内存\xe2\x80\x9d 方案成本更高的原因之一。原因一是调用OS\xe2\x80\x99消息传递机制需要进行系统调用,将代码执行从用户态切换到内核态,比较耗时;另一个原因可能是操作系统消息传递方案必须将数据字节从发送者\xe2\x80\x99内存空间复制到接收者\xe2\x80\x99内存空间,因此复制成本非零。
\n说多线程程序只能使用一个CPU是不正确的。很多人之所以这么说,是因为 CPython 实现的一个神器:全局解释器锁(GIL)。由于 GIL,CPython 进程中的线程是序列化的。结果,看起来多线程python程序只使用了一个CPU。
\n但多线程计算机程序一般并不局限于一个核心,而对于 Python 来说,不使用 GIL 的实现确实可以并行运行多个线程,即同时在多个 CPU 上运行。(参见https://wiki.python.org/moin/GlobalInterpreterLock)。
\n鉴于 CPython 是 Python 的主要实现,因此可以理解为什么多线程 Python 程序通常等同于绑定到单核。
\n对于带有 GIL 的 Python,释放多核威力的唯一方法是使用多处理(也有例外,如下所述)。但是你的问题最好能够轻松地划分为相互通信最少的并行子问题,否则将不得不发生大量进程间通信,如上所述,使用 OS\xe2\x80\x99 消息的开销传递机制的成本将会很高,有时成本如此之高完全抵消了并行处理的好处。如果您的问题的本质需要并发例程之间的频繁通信,那么多线程是自然的选择。不幸的是,对于 CPython,由于 GIL,真正有效的并行多线程是不可能的。在这种情况下,您应该意识到 Python 并不是您项目的最佳工具,并考虑使用其他语言。
\n有\xe2\x80\x99s一种替代解决方案,即在用C(或其他语言)编写的外部库中实现并发处理例程,并将该模块导入Python。CPython GIL 不会阻塞该外部库生成的线程。
\n那么,有了 GIL 的负担,CPython 中的多线程有什么好处吗?不过,正如其他答案所提到的,如果您\xe2\x80\x99正在进行 IO 或网络通信,它仍然提供好处。在这些情况下,相关计算不是由您的 CPU 完成,而是由其他设备完成(在 IO 的情况下,磁盘控制器和 DMA(直接内存访问)控制器将以最少的 CPU 参与传输数据;在网络的情况下, NIC(网络接口卡)和 DMA 将在没有 CPU\xe2\x80\x99s 参与的情况下处理大部分任务,因此一旦线程将此类任务委托给 NIC 或磁盘控制器,操作系统就可以将该线程分配给睡眠状态并切换到同一程序的其他线程来做有用的工作。
\n在我的理解中,asyncio模块本质上是IO操作多线程的一个特例。
\n因此:\nCPU 密集型程序可以轻松分区以在通信有限的多个进程上运行:如果 GIL 不存在(例如 Jython),则使用多线程,或者如果 GIL 存在(例如 CPython),则使用多进程。
\nCPU 密集型程序,需要并发例程之间进行密集通信:如果 GIL 不存在,则使用多线程,或者使用其他编程语言。
\nIO 批次\xe2\x80\x99s:asyncio
\n已经有很多好的答案了。无法详细说明何时使用每一种。这是两者的更有趣的组合。多处理+异步:https://pypi.org/project/aiomultiprocess/。
它设计的用例是 highio,但仍然利用尽可能多的可用内核。Facebook 使用这个库编写了某种基于 python 的文件服务器。Asyncio 允许 IO 绑定流量,但多处理允许多个事件循环和多核上的线程。
存储库中的 Ex 代码:
import asyncio
from aiohttp import request
from aiomultiprocess import Pool
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
async for result in pool.map(get, urls):
... # process result
if __name__ == '__main__':
# Python 3.7
asyncio.run(main())
# Python 3.6
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
只是这里的补充,在 jupyter 笔记本中不能很好地工作,因为笔记本已经运行了 asyncio 循环。只是提醒您不要拔掉头发。
小智 8
多处理可以并行运行。
多线程和asyncio不能并行运行。
使用Intel(R) Core(TM) i7-8700K CPU @ 3.70GHz和32.0 GB RAM ,我计算了2 个进程、2 个线程和2 个异步任务2
之间100000
的素数数量,如下所示。*这是CPU密集型计算:
多重处理 | 多线程 | 异步 |
---|---|---|
23.87秒 | 45.24秒 | 44.77秒 |
因为多处理可以并行运行,所以多处理比多线程和异步快一倍,如上所示。
我使用了下面3组代码:
# "process_test.py"
from multiprocessing import Process
import time
start_time = time.time()
def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
if __name__ == "__main__": # This is needed to run processes on Windows
process_list = []
for _ in range(0, 2): # 2 processes
process = Process(target=test)
process_list.append(process)
for process in process_list:
process.start()
for process in process_list:
process.join()
print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds
Run Code Online (Sandbox Code Playgroud)
结果:
...
9592
9592
23.87 seconds
Run Code Online (Sandbox Code Playgroud)
# "thread_test.py"
from threading import Thread
import time
start_time = time.time()
def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
thread_list = []
for _ in range(0, 2): # 2 threads
thread = Thread(target=test)
thread_list.append(thread)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds
Run Code Online (Sandbox Code Playgroud)
结果:
...
9592
9592
45.24 seconds
Run Code Online (Sandbox Code Playgroud)
# "asyncio_test.py"
import asyncio
import time
start_time = time.time()
async def test():
num = 100000
primes = 0
for i in range(2, num + 1):
for j in range(2, i):
if i % j == 0:
break
else:
primes += 1
print(primes)
async def call_tests():
tasks = []
for _ in range(0, 2): # 2 asyncio tasks
tasks.append(test())
await asyncio.gather(*tasks)
asyncio.run(call_tests())
print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds
Run Code Online (Sandbox Code Playgroud)
结果:
...
9592
9592
44.77 seconds
Run Code Online (Sandbox Code Playgroud)