将 numba 与 np.concatenate 并行使用效率不高?

jea*_*nie 5 python parallel-processing performance numpy numba

I\xe2\x80\x99m 在高效并行化方面遇到一些麻烦np.concatenate。这是一个最小的工作示例。(我知道在这里我可以分别对ab求和,但我专注于并行连接操作,因为这是我在项目中需要做的事情。然后我将对连接数组进行进一步的操作,例如排序。)

\n

无论我在多少个核心上运行此程序,它似乎总是花费相同的时间(约 10 秒)。如果说有什么不同的话,那就是核心数越多,速度就越慢。我尝试在装饰器中使用 cc 来释放 GIL nogil=True,但没有成功。请注意,即使没有加速,所有核心在计算过程中显然都在使用。

\n

有谁能够帮助我?

\n

非常感谢

\n
from numba import prange, njit\nimport numpy as np\n\n\n@njit()\ndef cc():\n\n    r = np.random.rand(20)\n    a = r[r < 0.5]\n    b = r[r > 0.7]\n\n    c = np.concatenate((a, b))\n\n    return np.sum(c)\n\n\n@njit(parallel=True)\ndef cc_wrap():\n    n = 10 ** 7\n    result = np.empty(n)\n    for i in prange(n):\n        result[i] = cc()\n\n    return result\n\ncc_wrap()\n
Run Code Online (Sandbox Code Playgroud)\n

Jér*_*ard 10

主要问题来自分配器的争用

cc函数创建许多隐式的小型临时数组。例如np.random.rand这样做以及r < 0.5甚至a = r[condition],更不用说np.concatenate。临时数组通常需要使用给定的分配器在堆中分配。标准库提供的默认分配器不能保证使用多个线程可以很好地扩展。分配不能完美地扩展,因为分配数据的线程之间需要昂贵的隐式同步。例如,一个线程可以分配一个被另一个线程删除的数组。在最坏的情况下,分配器可以序列化分配/删除。由于与对已分配数据执行的工作相比,分配数据的成本巨大,因此同步的开销占主导地位,并且整体执行是串行化的。实际上,情况甚至更糟,因为顺序时间已经被开销所主导

请注意,积极优化的编译器可能会在堆栈上分配数组,因为它们不会转义函数。然而,不幸的是,Numba 并没有明显做到这一点。此外,假设 Numba 线程永远不会删除其他线程分配的数据(可能是这种情况,但我不完全确定),则可以使用每个线程池调整分配器以很好地扩展。尽管如此,分配的内存池仍然需要向操作系统请求,而操作系统通常也不能很好地扩展(尤其是 Windows AFAIK)。

最好的解决方案就是避免创建隐式临时数组。使用每个工作人员的本地临时数组与分区算法相结合可以实现这一点。请注意,可以通过为 Numba 指定类型来提前编译这些函数。

这是最终的实现:

import numba as nb
import numpy as np
import random

@nb.njit('float64(float64[::1])')
def cc(tempBuffer):
    assert tempBuffer.size >= 20

    # View to the temporary scratch buffer
    r = tempBuffer[0:20]

    # Generate 20 random numbers without any allocation
    for i in range(20):
        r[i] = random.random()

    j = 0

    # Partition the array so to put values smaller than
    # a condition in the beginning.
    # After the loop, r[0:split] contains the filtered values.
    for i in range(r.size):
        if r[i] < 0.5:
            r[i], r[j] = r[j], r[i]
            j += 1

    split = j

    # Partition the rest of the array.
    # After the loop, r[split:j] contains the other filtered values.
    for i in range(j, r.size):
        if r[i] > 0.7:
            r[i], r[j] = r[j], r[i]
            j += 1

    # Note that extracting contiguous views it cheap as 
    # it does not create a new temporary array
    # a = r[0:split]
    # b = r[split:j]
    c = r[0:j]

    return np.sum(c)

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)

    # Preallocate some space once for all threads
    globalTempBuffer = np.empty((nb.get_num_threads(), 64), dtype=np.float64)

    for i in nb.prange(n):
        threadId = nb.np.ufunc.parallel._get_thread_id()
        threadLocalBuffer = globalTempBuffer[threadId]
        result[i] = cc(threadLocalBuffer)

    return result

cc_wrap()
Run Code Online (Sandbox Code Playgroud)

请注意,线程本地操作有点棘手,而且通常不需要。在这种情况下,仅使用分区算法来减少分配即可明显提高速度。然而,由于临时数组的大小和分配数量非常小,分配的开销仍然相当大。

另请注意,r此代码中并不严格要求数组,因为随机数可以就地求和。这可能不符合您实际用例的需求。这是一个(更简单的实现):

@nb.njit('float64()')
def cc():
    s = 0.0
    for i in range(20):
        e = random.random()
        if e < 0.5 or e > 0.7:
            s += e
    return s

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)
    for i in nb.prange(n):
        result[i] = cc()
    return result

cc_wrap()
Run Code Online (Sandbox Code Playgroud)

以下是我的 6 核机器上的时序:

# Initial (sequential):      8.1 s
# Initial (parallel):        9.0 s
# Array-based (sequential):  2.50 s
# Array-based (parallel):    0.41 s
# In-place (sequential):     1.09 s
# In-place (parallel):       0.19 s
Run Code Online (Sandbox Code Playgroud)

最后,最快的并行版本比原始版本快 47 倍(并且扩展几乎完美)。