Kev*_*vin 2 c python numpy python-c-api
我为 numpy 库编写了一个 c 扩展,用于计算特定类型的 bincount。由于缺少更好的名称,我们调用它fast_compiled并将方法签名放在numpy/core/src/multiarray/multiarraymodule.c里面array_module_methods:
{"fast_compiled", (PyCFunction)arr_fast_compiled,
METH_VARARGS | METH_KEYWORDS, NULL},
Run Code Online (Sandbox Code Playgroud)
以及numpy/core/src/multiarray/compiled_base.c(和compiled_base.h)中的实际实现:
NPY_NO_EXPORT PyObject *
arr_fast_compiled(PyObject *NPY_UNUSED(self), PyObject *args, PyObject *kwds)
{
PyObject *list_obj = NULL, *strides_obj = Py_None;
PyArrayObject *list_arr = NULL, *ans = NULL, *strides_arr = NULL;
npy_intp len, ans_size, total_size;
npy_intp i, j, k;
double *dans, *weights;
npy_intp* strides;
static char *kwlist[] = {"weights", "strides", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|O",
kwlist, &list_obj, &strides_obj)) {
goto fail;
}
list_arr = (PyArrayObject *)PyArray_ContiguousFromAny(list_obj, NPY_DOUBLE, 2, 2);
if (list_arr == NULL) {
goto fail;
}
len = PyArray_DIM(list_arr, 0);
weights = (double *)PyArray_DATA(list_arr);
ans_size = 2*len-1;
ans = (PyArrayObject *)PyArray_ZEROS(1, &ans_size, NPY_DOUBLE, 0);
if (ans == NULL) {
goto fail;
}
dans = (double *)PyArray_DATA(ans);
NPY_BEGIN_ALLOW_THREADS;
if (strides_obj == Py_None) {
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += weights[k];
}
}
Py_DECREF(list_arr);
}
else {
total_size = len*len;
strides_arr = (PyArrayObject *)PyArray_ContiguousFromAny(
strides_obj, NPY_INTP, 1, 1);
strides = (npy_intp *)PyArray_DATA(strides_arr);
for (i = 0; i < total_size; ++i) {
dans[strides[i]] += weights[i];
}
Py_DECREF(list_arr);
Py_DECREF(strides_arr);
}
NPY_END_ALLOW_THREADS;
return (PyObject *)ans;
fail:
Py_XDECREF(list_arr);
Py_XDECREF(strides_arr);
Py_XDECREF(ans);
return NULL;
}
Run Code Online (Sandbox Code Playgroud)
该方法采用一个必需的位置参数weights和一个可选的关键字参数strides。取决于如果strides指定的,它将使用不同(等效)的方式来计算答案。
我很好奇为什么预先计算步幅并将其指定为关键字参数比计算嵌套 for 循环中的步幅要慢。即为什么是这样:
for (i = 0; i < total_size; ++i) {
dans[strides[i]] += weights[i];
}
Run Code Online (Sandbox Code Playgroud)
比这慢:
for (i = 0; i < len; ++i) {
k = i * len;
for (j = i; j < i + len; ++j, ++k) {
dans[j] += weights[k];
}
}
Run Code Online (Sandbox Code Playgroud)
这是我计算基准的方法:
import numpy as np
import perfplot
def fast_compiled(args):
A, _ = args
return np.fast_compiled(A)
def fast_compiled_strides(args):
A, strides = args
return np.fast_compiled(A, strides=strides)
def setup(n):
A = np.random.normal(size=(n, n))
strides = np.arange(n*n)
strides = np.lib.stride_tricks.sliding_window_view(strides, (n,))
strides = strides[:n]
strides = strides.flatten() # make sure it is continous
return A, strides
perfplot.show(
setup=setup,
kernels=[fast_compiled, fast_compiled_strides],
n_range=[2 ** k for k in range(3, 15)],
xlabel='n',
relative_to=0,
)
Run Code Online (Sandbox Code Playgroud)
fast_compiled比fast_compiled_strides因为它在编译时已知的连续数据上工作更快,从而使编译器能够使用SIMD 指令(例如,通常是类似 x86 的平台上的 SSE 或 ARM 平台上的 NEON)。它也应该更快,因为从 L1 缓存中检索的数据缓存更少(由于间接需要更多的提取)。
事实上,dans[j] += weights[k]可以通过加载被矢量化m的项目dans和m项weights添加m使用一个指令项目和储存m项目回来dans。该解决方案高效且缓存友好。
dans[strides[i]] += weights[i]无法在大多数主流硬件上有效矢量化。由于间接性,处理器需要从内存层次结构中执行代价高昂的收集,然后进行求和,然后执行同样代价高昂的分散存储。即使strides包含连续索引,指令通常也比从内存加载连续数据块要昂贵得多。此外,编译器通常无法对代码进行矢量化,或者只是发现在这种情况下不值得使用 SIMD 指令。因此,生成的代码可能是效率较低的标量代码。
实际上,在具有良好编译标志的现代处理器上,这两种代码之间的性能差异应该更大。我怀疑您在这里只在 x86 处理器上使用 SSE,因此理论上速度接近 2,因为可以连续计算 2 个双精度浮点数。但是,使用 AVX/AVX-2 理论上会导致接近 4 的速度(因为可以连续计算 4 个数字)。最新的英特尔处理器甚至可以连续计算 8 个双精度浮点数。请注意,计算简单精度浮点数也可以使理论上的速度提高 2 倍。这同样适用于其他架构,如带有 NEON 和 SVE 指令集的 ARM 或 POWER。由于未来的处理器可能会使用更宽的 SIMD 寄存器(因为它们的效率),因此编写 SIMD 友好代码非常重要。
| 归档时间: |
|
| 查看次数: |
77 次 |
| 最近记录: |