use*_*964 6 python parallel-processing performance multiprocessing parallelism-amdahl
让我们定义:
from multiprocessing import Pool
import numpy as np
def func(x):
for i in range(1000):
i**2
return 1
Run Code Online (Sandbox Code Playgroud)
请注意,func()它做了一些事情,它总是返回一个小数字1。
然后,我比较了一个 8 核并行Pool.map()v/sa 串行,python 内置,map()
n=10**3
a=np.random.random(n).tolist()
with Pool(8) as p:
%timeit -r1 -n2 p.map(func,a)
%timeit -r1 -n2 list(map(func,a))
Run Code Online (Sandbox Code Playgroud)
这给出了:
38.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
200 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
Run Code Online (Sandbox Code Playgroud)
这显示了非常好的并行缩放。因为我使用 8 个内核,38.3 [ms]大约是 1/8200[s]
然后让我们尝试Pool.map()一些更大的东西的列表,为简单起见,我以这种方式使用列表列表:
n=10**3
m=10**4
a=np.random.random((n,m)).tolist()
with Pool(8) as p:
%timeit -r1 -n2 p.map(func,a)
%timeit -r1 -n2 list(map(func,a))
Run Code Online (Sandbox Code Playgroud)
这使 :
292 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
209 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
Run Code Online (Sandbox Code Playgroud)
你看,并行缩放已经消失了!1s ~ 1.76s
我们可以让它变得更糟,尝试使每个子列表传递更大:
n=10**3
m=10**5
a=np.random.random((n,m)).tolist()
with Pool(8) as p:
%timeit -r1 -n2 p.map(func,a)
%timeit -r1 -n2 list(map(func,a))
Run Code Online (Sandbox Code Playgroud)
这给出了:
3.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
179 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
Run Code Online (Sandbox Code Playgroud)
哇,对于更大的子列表,计时结果完全相反。我们使用 8 个内核来获得慢 20 倍的计时!!
您还可以注意到 serialmap()的时间与子列表大小无关。因此,一个合理的解释是,Pool.map()真正将那些大子列表的内容传递给导致额外复制的进程?
我不确定。但是如果是这样,为什么它不传递子列表的地址呢?毕竟,子列表已经在内存中了,在实践中func()我使用的保证不会更改/修改子列表。
那么,在python中,在大型事物列表上映射某些操作时保持并行缩放的正确方法是什么?
您的工作职能过早结束:
In [2]: %timeit func(1)
335 µs ± 12.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Run Code Online (Sandbox Code Playgroud)
所以你基本上是在测量多处理的开销。
更改您的工作功能以完成更多工作,例如循环1000 * 1000时间而不是1000时间,您会看到它再次缩放,我的 mac 上的1000000循环成本大致如此0.4s,与开销相比已经足够高了。
下面是n我的 mac 上不同的测试结果,我使用的Pool(4)是因为我有 4 个内核,测试只运行一次而不是多次%timeit,因为差异微不足道:
您可以看到加速比与 成比例增加n,多处理的开销由每个工作函数调用共享。
背后的数学,假设每次调用的开销是相等的:
如果我们想要ratio > 1:
大致相等:
这意味着,如果与每次调用的开销相比,工作函数运行得太快,multiprocessing则不会扩展。
在我们开始
并深入研究任何对纳秒的探索之前(没错,它很快就会开始,因为每一个[ns]都很重要,因为缩放打开了问题的整个潘多拉魔盒),让我们就缩放达成一致——最容易而且通常是“便宜”的过早一旦问题规模发展到现实规模,技巧可能并且经常会破坏您的梦想 - 成千上万(在上面两个迭代器中看到)对于具有数据获取的缓存计算的行为方式与< 0.5 [ns]曾经超过 L1 的情况有所不同/ L2 / L3-缓存尺寸秤上面1E+5, 1E+6, 1E+9,以上[GB]S,其中每个未对齐抓取是WAY更昂贵,比数100 [ns]
问:“...因为我有 8 个内核,所以我想用它们使速度提高 8 倍”
我希望你能,确实如此。然而,很抱歉说实话,世界不是这样运作的。
请参阅此交互式工具,它将向您展示加速限制及其对初始问题实际缩放的实际生产成本的主要依赖,因为它是从琐碎的规模增长而来的,并且这些规模的 组合效应只需单击-it 和玩滑块以实时查看它:
问:(是)
Pool.map()真的将那些大子列表的内容传递给导致额外复制的进程?
是的,
它必须这样做,按设计
加上它通过“通过”另一个“昂贵的” SER/DES 处理“传递”所有数据
来做到这一点,以便使其发生在“那里”传递。
反之亦然,每当您试图返回“返回”一些乳齿象大小的结果时,反之亦然,而您没有,在上面。
Q :但是如果是这样,为什么不传递子列表的地址呢?
因为远程(参数接收)进程是另一个完全自主的进程,拥有自己的、独立的和受保护的地址空间,我们不能仅仅将地址引用“传入”,我们希望它成为一个完全独立的、自主的工作 python 进程(由于愿意使用这个技巧来逃避GIL-lock 跳舞),不是吗?我们确实做到了 - 这是我们摆脱 GIL-Wars 的核心步骤(为了更好地理解 GIL-lock 的优缺点,可能会喜欢这个和这个(第 15+ 页 CPU 绑定处理)。
0.1 ns - NOP
0.3 ns - XOR, ADD, SUB
0.5 ns - CPU L1 dCACHE reference (1st introduced in late 80-ies )
0.9 ns - JMP SHORT
1 ns - speed-of-light (a photon) travel a 1 ft (30.5cm) distance -- will stay, throughout any foreseeable future :o)
?~~~~~~~~~~~ 1 ns - MUL ( i**2 = MUL i, i )~~~~~~~~~ doing this 1,000 x is 1 [us]; 1,000,000 x is 1 [ms]; 1,000,000,000 x is 1 [s] ~~~~~~~~~~~~~~~~~~~~~~~~~
3~4 ns - CPU L2 CACHE reference (2020/Q1)
5 ns - CPU L1 iCACHE Branch mispredict
7 ns - CPU L2 CACHE reference
10 ns - DIV
19 ns - CPU L3 CACHE reference (2020/Q1 considered slow on 28c Skylake)
71 ns - CPU cross-QPI/NUMA best case on XEON E5-46*
100 ns - MUTEX lock/unlock
100 ns - own DDR MEMORY reference
135 ns - CPU cross-QPI/NUMA best case on XEON E7-*
202 ns - CPU cross-QPI/NUMA worst case on XEON E7-*
325 ns - CPU cross-QPI/NUMA worst case on XEON E5-46*
10,000 ns - Compress 1K bytes with a Zippy PROCESS
20,000 ns - Send 2K bytes over 1 Gbps NETWORK
250,000 ns - Read 1 MB sequentially from MEMORY
500,000 ns - Round trip within a same DataCenter
?~~~ 2,500,000 ns - Read 10 MB sequentially from MEMORY~~(about an empty python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s), yet an empty python interpreter is indeed not a real-world, production-grade use-case, is it?
10,000,000 ns - DISK seek
10,000,000 ns - Read 1 MB sequentially from NETWORK
?~~ 25,000,000 ns - Read 100 MB sequentially from MEMORY~~(somewhat light python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s)
30,000,000 ns - Read 1 MB sequentially from a DISK
?~~ 36,000,000 ns - Pickle.dump() SER a 10 MB object for IPC-transfer and remote DES in spawned process~~~~~~~~ x ( 2 ) for a single 10MB parameter-payload SER/DES + add an IPC-transport costs thereof or NETWORK-grade transport costs, if going into [distributed-computing] model Cluster ecosystem
150,000,000 ns - Send a NETWORK packet CA -> Netherlands
| | | |
| | | ns|
| | us|
| ms|
Run Code Online (Sandbox Code Playgroud)
问:“在大型事物列表上并行映射某些操作时,保持并行缩放的正确方法是什么?”
A )
了解避免或至少减少费用的方法:
了解您必须支付和将支付的所有类型的费用:
花费低的过程实例的成本尽可能地(而不是昂贵的)最好的,因为一次性的成本只
在 macOS 上,
spawnstart 方法现在是默认方法。该fork启动方法应该被认为是不安全的,因为它可以导致子进程崩溃。见bpo-33725。
花费尽可能少的参数传递成本(是的,最好避免重复传递那些“大东西”作为参数)
len( os.sched_getaffinity( 0 ) )- 任何超过这个的进程只会等待它的下一个 CPU 核心插槽,并且只会驱逐其他缓存效率高的进程,因此重新支付所有已经支付的获取成本以再次重新获取所有数据,以便将它们放回缓存中以便很快再次被逐出缓存中计算,而到目前为止以这种方式工作的那些进程是因天真地使用尽可能多的multiprocessing.cpu_count()-reported 进程而被驱逐(有什么好处?),在初始Pool-creation 中产生如此昂贵的费用)gc,如果不避免,它可能会阻塞,或者Pool.map()哪个阻塞B )
了解提高效率的方法:
了解所有提高效率的技巧,即使以代码复杂性为代价(一些 SLOC-s 很容易在教科书中展示,但同时牺牲了效率和性能 - 尽管在整个扩展过程中(无论是问题规模还是迭代深度,或者同时增长它们),这两者都是您争取可持续性能的主要敌人。
来自A ) 的某些类别的实际成本极大地改变了理论上可实现的加速限制,从进入某种形式的[PARALLEL]流程编排(这里,使代码执行的某些部分在衍生的子程序中执行)流程),其最初的观点最早由 Gene Amdahl 博士在 60 多年前首次制定(为此,最近添加了两个主要扩展,与流程实例化相关的设置+终止增加了成本(非常在 py2 always & py3.5+ for MacOS 和 Windows 中很重要 ) 和atomicity-of-work,这将在下面讨论。
S = speedup which can be achieved with N processors
s = a proportion of a calculation, which is [SERIAL]
1-s = a parallelizable portion, that may run [PAR]
N = a number of processors ( CPU-cores ) actively participating on [PAR] processing
1
S = __________________________; where s, ( 1 - s ), N were defined above
( 1 - s ) pSO:= [PAR]-Setup-Overhead add-on cost/latency
s + pSO + _________ + pTO pTO:= [PAR]-Terminate-Overhead add-on cost/latency
N
Run Code Online (Sandbox Code Playgroud)
1 where s, ( 1 - s ), N
S = ______________________________________________ ; pSO, pTO
| ( 1 - s ) | were defined above
s + pSO + max| _________ , atomicP | + pTO atomicP:= a unit of work,
| N | further indivisible,
a duration of an
atomic-process-block
Run Code Online (Sandbox Code Playgroud)
1E+6任何简化的模型示例都会以某种方式扭曲您对实际工作负载如何在体内执行的期望。低估的 RAM 分配,在小规模上看不到,以后可能会在规模上出人意料,有时甚至会使操作系统陷入缓慢状态、交换和颠簸。一些更智能的工具 ( numba.jit()) 甚至可以分析代码并缩短一些永远不会被访问或不会产生任何结果的代码段落,因此请注意简化的示例可能会导致令人惊讶的观察结果。
from multiprocessing import Pool
import numpy as np
import os
SCALE = int( 1E9 )
STEP = int( 1E1 )
aLIST = np.random.random( ( 10**3, 10**4 ) ).tolist()
#######################################################################################
# func() does some SCALE'd amount of work, yet
# passes almost zero bytes as parameters
# allocates nothing, but iterator
# returns one byte,
# invariant to any expensive inputs
def func( x ):
for i in range( SCALE ):
i**2
return 1
Run Code Online (Sandbox Code Playgroud)
关于使扩展更少开销成本的策略变得昂贵的一些提示:
#####################################################################################
# more_work_en_block() wraps some SCALE'd amount of work, sub-list specified
def more_work_en_block( en_block = [ None, ] ):
return [ func( nth_item ) for nth_item in en_block ]
Run Code Online (Sandbox Code Playgroud)
如果确实必须通过一个大列表,最好通过更大的块,远程迭代它的部分(而不是为每个项目支付更多次的传输成本,而不是使用sub_blocks(参数得到 SER/DES 处理(~的成本pickle.dumps()+ pickle.loads())[每每个呼叫],再次,在一个附加的成本,即降低所得效率和恶化的延长,架空严格Amdahl定律的开销部分)
#####################################################################################
# some_work_en_block() wraps some SCALE'd amount of work, tuple-specified
def some_work_en_block( sub_block = ( [ None, ], 0, 1 ) ):
return more_work_en_block( en_block = sub_block[0][sub_block[1]:sub_block[2]] )
Run Code Online (Sandbox Code Playgroud)
aMaxNumOfProcessesThatMakesSenseToSPAWN = len( os.sched_getaffinity( 0 ) ) # never more
with Pool( aMaxNumOfProcessesThatMakesSenseToSPAWN ) as p:
p.imap_unordered( more_work_en_block, [ ( aLIST,
start,
start + STEP
)
for start in range( 0, len( aLIST ), STEP ) ] )
Run Code Online (Sandbox Code Playgroud)
最后但并非最不重要的一点是,预计numpy智能矢量化代码的智能使用会带来巨大的性能提升,最好不要重复传递静态、预复制(在进程实例化过程中,因此作为合理缩放的、这里不可避免的成本)其中)BLOB,在代码中使用,而不通过参数传递以矢量化(CPU 非常高效)的方式作为只读数据传递相同的数据。在一个怎样的一些例子可以~ +500 x加速一个可以读取这里或在这里,约但~ +400 x加速比或约只是一个的情况下~ +100 x加速,一些问题隔离的一些例子测试场景。
无论如何,模型代码与您的实际工作负载越接近,基准测试就越有意义(在规模和生产中)。
祝你在探索世界时好运,
如果它有所不同,则
不是梦想,也不是希望它有所不同或我们希望它成为
:o)
事实和科学很重要 - 两者 + 一起
证据记录是核心步骤是实现尽可能高的性能,
不是任何产品营销,
不是任何传福音氏族战争,
不是任何博客帖子的喋喋不休
至少不要说你没有被警告
:o)