luc*_*one 115 python parallel-processing multithreading process multiprocessing
我正在学习如何使用Python中threading的multiprocessing模块和并行运行某些操作并加快我的代码.
我发现这很难(可能因为我没有任何理论背景)来理解一个threading.Thread()对象和一个对象之间的区别multiprocessing.Process().
此外,我并不完全清楚如何实例化一个作业队列,并且只有4个(例如)它们并行运行,而另一个则在执行之前等待资源释放.
我发现文档中的示例清晰,但不是很详尽; 一旦我尝试使事情复杂化,我就会收到许多奇怪的错误(比如一种无法腌制的方法,等等).
那么,我什么时候应该使用threading和multiprocessing模块?
您能否将我链接到一些资源,解释这两个模块背后的概念以及如何正确使用它们来完成复杂的任务?
aba*_*ert 236
Giulio Franco所说的对于多线程与一般的多处理是一样的.
但是,Python *还有一个问题:有一个全局解释器锁,可以防止同一进程中的两个线程同时运行Python代码.这意味着如果你有8个内核,并且你的代码改为使用8个线程,它将无法使用800%的CPU并且运行速度提高8倍; 它将使用相同的100%CPU并以相同的速度运行.(实际上,它会运行得慢一些,因为即使你没有任何共享数据,线程也会产生额外的开销,但是暂时忽略它.)
这有例外.如果您的代码的繁重计算实际上并未在Python中发生,但在某些库中使用自定义C代码执行正确的GIL处理(如numpy应用程序),您将从线程获得预期的性能优势.如果繁重的计算是由您运行并等待的某个子进程完成的,那么情况也是如此.
更重要的是,有些情况并不重要.例如,网络服务器花费大部分时间从网络读取数据包,GUI应用程序花费大部分时间等待用户事件.在网络服务器或GUI应用程序中使用线程的一个原因是允许您在不停止主线程继续服务网络数据包或GUI事件的情况下执行长时间运行的"后台任务".这对Python线程来说效果很好.(从技术角度来说,这意味着Python线程可以为您提供并发性,即使它们没有为您提供核心并行性.)
但是,如果您使用纯Python编写CPU绑定程序,则使用更多线程通常没有帮助.
使用单独的进程对GIL没有这样的问题,因为每个进程都有自己独立的GIL.当然,在线程和进程之间仍然存在与任何其他语言相同的权衡 - 在进程之间共享数据比在线程之间共享数据更困难,更昂贵,运行大量进程或创建和销毁可能代价高昂它们经常出现等等.但是GIL对流程的平衡很重要,对C或Java来说并非如此.因此,您会发现自己在Python中使用多处理比在C或Java中使用多处理.
与此同时,Python的"包含电池"理念带来了一些好消息:编写可以在线程和进程之间来回切换的代码非常容易.
如果您根据自包含的"作业"设计代码,这些作业不与输入和输出之外的其他作业(或主程序)共享任何内容,您可以使用concurrent.futures库在线程池中编写代码,如下所示:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(job, argument)
executor.map(some_function, collection_of_independent_things)
# ...
Run Code Online (Sandbox Code Playgroud)
您甚至可以获得这些工作的结果并将其传递给更多工作,等待执行顺序或完成顺序等等; 阅读有关Future对象的部分以获取详细信息
现在,如果事实证明你的程序经常使用100%的CPU,并且添加更多线程只会让它变慢,那么你就会遇到GIL问题,所以你需要切换到进程.您所要做的就是改变第一行:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
Run Code Online (Sandbox Code Playgroud)
唯一真正需要注意的是,您的作业参数和返回值必须是可选择的(并且不需要花费太多时间或内存来腌制)才能使用跨进程.通常这不是问题,但有时它是.
但是如果你的工作不能自给自足呢?如果您可以根据将消息从一个消息传递到另一个消息的作业来设计代码,那么它仍然非常简单.您可能必须使用threading.Thread或multiprocessing.Process不依赖于池.而且你必须明确地创建queue.Queue或multiprocessing.Queue对象.(还有很多其他选项 - 管道,套接字,带有鸡群的文件......但重点是,如果Executor的自动魔力不足,你必须手动执行某些操作.)
但是,如果你甚至不能依赖消息传递呢?如果您需要两个工作来改变相同的结构,并看到彼此的变化,该怎么办?在这种情况下,您需要进行手动同步(锁,信号量,条件等),如果要使用进程,则需要显式共享内存对象进行引导.这是多线程(或多处理)变得困难的时候.如果你可以避免它,那很好; 如果你不能,你需要阅读的内容超过了某人可以提出的答案.
从评论中,您想知道Python中的线程和进程之间的区别.真的,如果你阅读Giulio Franco的回答以及我和我的所有链接,那应该涵盖所有内容......但总结肯定会有用,所以这里有:
ctypes类型.threading模块没有该模块的某些功能multiprocessing.(您可以使用multiprocessing.dummy在线程之上获取大部分缺少的API,或者您可以使用更高级别的模块concurrent.futures,而不用担心它.)*实际上并不是具有此问题的Python语言,而是CPython,即该语言的"标准"实现.其他一些实现没有GIL,比如Jython.
**如果您正在使用fork start方法进行多处理 - 您可以在大多数非Windows平台上使用 - 每个子进程都会获取父级在启动子进程时拥有的任何资源,这可能是将数据传递给子级的另一种方法.
Giu*_*nco 29
多个线程可以存在于单个进程中.属于同一进程的线程共享相同的内存区域(可以读取和写入相同的变量,并且可以相互干扰).相反,不同的进程存在于不同的存储区域中,并且每个进程都有自己的变量.为了进行通信,进程必须使用其他通道(文件,管道或套接字).
如果要并行化计算,则可能需要多线程,因为您可能希望线程在同一内存上进行协作.
谈到性能,线程比进程创建和管理更快(因为操作系统不需要分配全新的虚拟内存区域),并且线程间通信通常比进程间通信更快.但线程更难编程.线程可以相互干扰,并且可以写入彼此的内存,但这种情况发生的方式并不总是很明显(由于几个因素,主要是指令重新排序和内存缓存),因此您将需要同步原语来控制访问你的变量.
Python 文档引用
我在以下位置突出了有关进程与线程和 GIL 的关键 Python 文档引用:什么是 CPython 中的全局解释器锁 (GIL)?
进程 vs 线程实验
为了更具体地显示差异,我做了一些基准测试。
在基准测试中,我对8 个超线程CPU上的各种线程数量的 CPU 和 IO 绑定工作进行了计时。每个线程提供的工作总是相同的,因此更多的线程意味着提供更多的总工作。
结果是:
绘图数据。
结论:
对于 CPU 密集型工作,多处理总是更快,大概是由于 GIL
用于 IO 绑定工作。两者的速度完全相同
由于我在 8 超线程机器上,线程只能扩展到大约 4 倍而不是预期的 8 倍。
与达到预期 8 倍加速比的 C POSIX CPU 密集型工作相比:“真实”、“用户”和“系统”在 time(1) 的输出中意味着什么?
TODO:我不知道这是什么原因,肯定还有其他 Python 效率低下的问题。
测试代码:
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import sys
def cpu_func(result, niters):
'''
A useless CPU bound function.
'''
for i in range(niters):
result = (result * result * i + 2 * result * i * i + 3) % 10000000
return result
class CpuThread(threading.Thread):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class CpuProcess(multiprocessing.Process):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class IoThread(threading.Thread):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
class IoProcess(multiprocessing.Process):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
if __name__ == '__main__':
cpu_n_iters = int(sys.argv[1])
sleep = 1
cpu_count = multiprocessing.cpu_count()
input_params = [
(CpuThread, cpu_n_iters),
(CpuProcess, cpu_n_iters),
(IoThread, sleep),
(IoProcess, sleep),
]
header = ['nthreads']
for thread_class, _ in input_params:
header.append(thread_class.__name__)
print(' '.join(header))
for nthreads in range(1, 2 * cpu_count):
results = [nthreads]
for thread_class, work_size in input_params:
start_time = time.time()
threads = []
for i in range(nthreads):
thread = thread_class(work_size)
threads.append(thread)
thread.start()
for i, thread in enumerate(threads):
thread.join()
results.append(time.time() - start_time)
print(' '.join('{:.6e}'.format(result) for result in results))
Run Code Online (Sandbox Code Playgroud)
在 Ubuntu 18.10、Python 3.6.7、联想 ThinkPad P51 笔记本电脑上测试,CPU:Intel Core i7-7820HQ CPU(4 核/8 线程),RAM:2x Samsung M471A2K43BB1-CRC(2x 16GiB),SSD:Samsung MZVLB512HAJ 000L7(3,000 MB/秒)。
可视化在给定时间运行哪些线程
这篇文章https://rohanvarma.me/GIL/告诉我,每当一个线程被调度与您可以运行一个回调target=的参数threading.Thread和相同的multiprocessing.Process。
这使我们能够准确地查看每次运行的线程。完成后,我们会看到类似的内容(我制作了这个特殊的图表):
+--------------------------------------+
+ Active threads / processes +
+-----------+--------------------------------------+
|Thread 1 |******** ************ |
| 2 | ***** *************|
+-----------+--------------------------------------+
|Process 1 |*** ************** ****** **** |
| 2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
+ Time --> +
+--------------------------------------+
Run Code Online (Sandbox Code Playgroud)
这将表明: