MSe*_*ert 6 python parallel-processing numba
在另一个Q + A(我可以在pandas中执行动态cumsum?)我对使用prange这个代码的正确性做了评论(这个答案):
from numba import njit, prange
@njit
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
Run Code Online (Sandbox Code Playgroud)
评论是:
我不建议并行化一个不纯的循环.在这种情况下,
running变量使其不纯.有4种可能的结果:(1)numba决定它不能并行处理它只是处理循环cumsum而不是prange(2)它可以将变量提升到循环之外并在余数上使用并行化(3)numba错误地插入并行执行和结果之间的同步可能是虚假的(4)numba在运行时插入必要的同步,这可能会比通过并行化首先获得更多的开销
而后来的补充:
当然,
running和cumsum变量都使循环"不纯",而不仅仅是前面评论中所述的运行变量
然后我被问到:
这可能听起来像一个愚蠢的问题,但我怎么能弄清楚它做了哪4件事并改进了呢?我真的想用numba变得更好!
鉴于它可能对未来的读者有用,我决定在这里创建一个自我回答的Q + A. 掠夺者:我无法真正回答4个结果中的哪一个产生的问题(或者如果numba产生完全不同的结果),所以我非常鼓励其他答案.
TL;DR:首先:prange与 相同range,除非您添加平行于jit,例如njit(parallel=True)。如果你尝试这样做,你会看到一个关于“不受支持的归约”的例外——那是因为 Numba 将范围限制prange为“纯”循环和“不纯循环”与 numba 支持的归约,并负责确保它落入用户的这些类别中的任何一个。
这在numbas prange(version 0.42)的文档中有明确说明:
1.10.2. 显式并行循环
此代码转换过程的另一个功能是支持显式并行循环。可以使用 Numba's
prange而不是range指定循环可以并行化。用户需要确保循环除了支持的归约之外没有交叉迭代依赖性。
在该文档中,评论所指的“不纯”被称为“交叉迭代依赖”。这种“交叉迭代依赖”是一个在循环之间变化的变量。一个简单的例子是:
def func(n):
a = 0
for i in range(n):
a += 1
return a
Run Code Online (Sandbox Code Playgroud)
这里的变量a取决于它在循环开始之前的值以及循环执行了多少次迭代。这就是“交叉迭代依赖”或“不纯”循环的含义。
显式并行化此类循环时的问题在于迭代是并行执行的,但每次迭代都需要知道其他迭代在做什么。不这样做会导致错误的结果。
让我们一会儿认为prange会催生4名工人,我们通过4为n给函数。一个完全幼稚的实现会做什么?
def func(n):
a = 0
for i in range(n):
a += 1
return a
Run Code Online (Sandbox Code Playgroud)
不同 worker 读取、执行和写入的顺序a可以是任意的,这只是一个例子。它也可以(偶然地)产生正确的结果!这通常称为Race condition。
prange认识到存在这种交叉迭代依赖性的更复杂的做法是什么?
共有三个选项:
鉴于我对 numba 文档的理解(再次重复):
用户需要确保循环除了支持的归约之外没有交叉迭代依赖性。
Numba 确实:
不幸的是,目前尚不清楚“支持的减少”是什么。但是文档提示它是对循环体中的前一个值进行操作的二元运算符:
如果二元函数/运算符使用其在循环体中的先前值更新变量,则会自动推断出减少。减少的初始值是为
+=和*=运算符自动推断的。对于其他函数/运算符,reduction 变量应该在进入prange循环之前保存标识值。标量和任意维度的数组支持以这种方式减少。
OP 中的代码使用列表作为交叉迭代依赖项并list.append在循环体中调用。就我个人而言,我不会称之为list.append减少,并且它没有使用二元运算符,所以我的假设是它很可能不受支持。至于其他交叉迭代依赖running:它对上一次迭代的结果使用加法(这很好),但如果超过阈值(这可能不好),也会有条件地将其重置为零。
Numba 提供了检查中间代码(LLVM 和 ASM)代码的方法:
dynamic_cumsum.inspect_types()
dynamic_cumsum.inspect_llvm()
dynamic_cumsum.inspect_asm()
Run Code Online (Sandbox Code Playgroud)
但是,即使我对结果有必要的理解以对发出的代码的正确性做出任何声明 - 一般来说,“证明”多线程/进程代码正常工作是非常重要的。鉴于我什至缺乏 LLVM 和 ASM 知识,甚至无法查看它是否试图并行化它,我实际上无法回答您的具体问题,它会产生哪种结果。
回到代码,如前所述,如果我使用它会抛出异常(不支持的减少)parallel=True,所以我假设 numba 不会并行化示例中的任何内容:
from numba import njit, prange
@njit(parallel=True)
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
dynamic_cumsum(np.ones(100), np.arange(100), 10)
Run Code Online (Sandbox Code Playgroud)
Run Code Online (Sandbox Code Playgroud)AssertionError: Invalid reduction format During handling of the above exception, another exception occurred: LoweringError: Failed in nopython mode pipeline (step: nopython mode backend) Invalid reduction format File "<>", line 7: def dynamic_cumsum(seq, index, max_value): <source elided> running = 0 for i in prange(len(seq)): ^ [1] During: lowering "id=2[LoopNest(index_variable = parfor_index.192, range = (0, seq_size0.189, 1))]{56: <ir.Block at <> (10)>, 24: <ir.Block at <> (7)>, 34: <ir.Block at <> (8)>}Var(parfor_index.192, <> (7))" at <> (7)
所以剩下要说的是:在这种情况下,与正常情况prange相比没有提供任何速度优势(因为它不是并行执行的)。因此,在这种情况下,我不会“冒险”潜在问题和/或让读者感到困惑——因为根据 numba 文档不支持它。range
from numba import njit, prange
@njit
def p_dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
@njit
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in range(len(seq)): # <-- here is the only change
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
Run Code Online (Sandbox Code Playgroud)
只是一个快速的时间,支持我之前所做的“不快于”声明:
import numpy as np
seq = np.random.randint(0, 100, 10_000_000)
index = np.arange(10_000_000)
max_ = 500
# Correctness and warm-up
assert p_dynamic_cumsum(seq, index, max_) == dynamic_cumsum(seq, index, max_)
%timeit p_dynamic_cumsum(seq, index, max_)
# 468 ms ± 12.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit dynamic_cumsum(seq, index, max_)
# 470 ms ± 9.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
361 次 |
| 最近记录: |