Man*_*jan 3 python multitasking
我有一个 python 程序,它 1) 从磁盘中读取一个非常大的文件(约 95% 的时间),然后 2)处理并提供相对较小的输出(约 5% 的时间)。该程序将在 TB 级的文件上运行。
现在我希望通过利用多处理和多线程来优化这个程序。我正在运行的平台是一个虚拟机,在一个虚拟机上有 4 个处理器。
我计划有一个调度程序进程,它将执行 4 个进程(与处理器相同),然后每个进程应该有一些线程,因为大部分是 I/O 。每个线程将处理 1 个文件并将结果报告给主线程,主线程又将通过 IPC 将其报告回调度程序进程。调度程序可以将它们排队并最终以有序的方式将它们写入磁盘
所以想知道如何决定为这种情况创建的进程和线程的数量?有没有一种数学方法可以找出最佳组合。
谢谢
我想我会安排它与你正在做的相反。也就是说,我会创建一个特定大小的线程池来负责产生结果。提交到此池的任务将作为参数传递给处理器池,工作线程可以使用该处理器池来提交受 CPU 限制的工作部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将任何 CPU 密集型工作交给处理器池。
处理器池的大小应该只是您环境中的处理器数量。很难给出线程池的精确大小;这取决于在收益递减规律发挥作用之前它可以处理多少并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用此值。下面的代码概述了这些想法。您从线程池中获得的 I/O 操作重叠比您仅使用小型处理器池所实现的要大:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
def cpu_bound_function(arg1, arg2):
...
return some_result
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'r') as f:
# Do disk related operations:
. . . # code omitted
# Now we have to do a CPU-intensive operation:
future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
result = future.result() # get result
return result
file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)
Run Code Online (Sandbox Code Playgroud)
重要说明:
另一种更简单的方法是只有一个处理器池,其大小大于您拥有的 CPU 处理器的数量,例如,25。工作进程将执行 I/O 和 CPU 操作。即使您的进程数多于 CPU,许多进程仍将处于等待状态,等待 I/O 完成,从而允许运行 CPU 密集型工作。
这种方法的缺点是创建N个进程的开销远远大于创建N个线程+少量进程的开销。然而,随着提交到池的任务的运行时间变得越来越大,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,这可能是一种性能合理的简化。
更新:两种方法的基准
我对这两种处理 24 个文件的方法做了一些基准测试,这些文件的大小大约为 10,000KB(实际上,这些只是 3 个不同的文件,每个文件处理了 8 次,因此可能已经完成了一些缓存):
方法一(线程池+处理器池)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'rb') as f:
b = f.read()
future = process_pool_executor.submit(cpu_bound_function, b)
result = future.result() # get result
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
Run Code Online (Sandbox Code Playgroud)
方法 2(仅处理器池)
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(file_name):
with open(file_name, 'rb') as f:
b = f.read()
result = cpu_bound_function(b)
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(process_pool_executor.map(io_bound_function, file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
Run Code Online (Sandbox Code Playgroud)
结果:
(我有8核)
线程池 + 处理器池:13.5 秒单独的处理器池:13.3 秒
结论:我会首先尝试更简单的方法,即对所有内容都使用处理器池。现在棘手的一点是决定要创建的最大进程数是多少,这是您原始问题的一部分,并且在它所做的只是 CPU 密集型计算时有一个简单的答案。如果您正在阅读的文件数量不是太多,那么这一点没有意义;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望池中有数百个进程(您可以创建的进程数也有上限,而且还有那些讨厌的内存限制)。我没有办法给你一个确切的数字。如果您确实有大量文件,请从较小的池大小开始并不断增加,直到您无法获得更多好处(当然,
| 归档时间: |
|
| 查看次数: |
527 次 |
| 最近记录: |