Inf*_*y77 5 c python gcc simd cython
我仍在与在密集、单精度、Fortran 排序的二维数组中查找非零条目的索引(和相应值)i的问题作斗争。我通过 Python 使用 Cython,中间有一些 C。j
我提前道歉,因为这篇文章将会非常长。
我必须处理数千个(有时数百万个)中型 2D 数组(有时 700 x 1,000,有时 6,000 x 7,000 等等),这些数组非常稀疏,但它们提供为密集的(密度可以低至 0.02% 和高达 1-2%)。这些矩阵有时具有某种结构,但通常这是不可预测的。
我尝试过 numpy.nonzero 和 Scipy稀疏的东西,但不幸的是它们比我的慢。
我问这个问题是为了看看我的(可能不正确的)代码的性能是否有可能改进- 即,使其更快 - 而且也可以从更有经验的人那里学习新东西。
我对 Cython 的熟练程度非常有限。我对 C 的了解很糟糕(实际上几乎为零)。我所知道的关于 SIMD 指令的一切都可以用大字写在邮票上。
我在 StackOverflow 上来回搜索并找到了类似问题的答案,其中许多问题都使用了非常先进的 SIMD 解决方案。但由于我对 SIMD 一无所知,因此我无法修改它们以满足我的需要。
我使用这些标志编译下面发布的 Cython 脚本:
-O3 -ffast-math -funroll-loops -ftree-vectorize
Run Code Online (Sandbox Code Playgroud)
我无法更改这些矩阵的生成方式及其数据类型,并且我必须重复查找这些矩阵中的非零元素,特别是这 4 个信息:
当然, len(row_indices) = len(column_indices) = len(x) = 非零数
在下面的脚本中我实现了 3 种方法:
i和j索引以及数组值存储在 3 个单独的、先前malloc编辑的数组中memcmp。如果memcmp返回非零值,则行k与行之间至少有一个非零元素k + 64我需要这 3 个数组(row_indices用于i索引、col_starts索引j和x矩阵中的非零浮点数)将它们传递到另一个库。
nnz_c_support.h(C 代码,由 Cython 脚本使用)
#include <stdlib.h>
#include <stdint.h>
#include <limits.h>
#include <stddef.h>
__attribute__ ((hot)) int dataisnull(const void *data, size_t length) {
/* assuming data was returned by malloc, thus is properly aligned */
size_t i = 0, i0, n = length / sizeof(size_t);
const size_t *pw = data;
const unsigned char *pb = data;
#define UNROLL_FACTOR 8
#if UNROLL_FACTOR == 8
size_t n1 = n - n % UNROLL_FACTOR;
for (; i < n1; i += UNROLL_FACTOR) {
size_t val = pw[i + 0] | pw[i + 1] | pw[i + 2] | pw[i + 3] |
pw[i + 4] | pw[i + 5] | pw[i + 6] | pw[i + 7];
if (val)
return i+1;
}
#endif
size_t val = 0;
// gcc and clang think they should autovectorize this cleanup loop
// using a single-byte accumulator convinces them at least not to unpack bytes to size_t before ORing
i0 = i+1;
i *= sizeof(size_t);
// i = n * sizeof(size_t)
unsigned char bval = 0;
for (; i < length; i++) {
bval |= pb[i];
}
return ((val|bval) != 0 ? i0 : 0);
}
Run Code Online (Sandbox Code Playgroud)
nonzero.pyx(具有 3 种不同方法的 Cython 代码)
#cython: language_level=3,boundscheck=False,wraparound=False,nonecheck=False,initializedcheck=False,cdivision=True
###############################################################################
import numpy as np
cimport numpy as np
import cython
from libc.stdlib cimport malloc, calloc, free
from libc.string cimport memcmp
cdef int MAXNNZ = 500000
DTYPE_float = np.float32
ctypedef np.float32_t DTYPE_float_t
cdef extern from "nnz_c_support.h" nogil:
cdef int dataisnull(const void *data, size_t length)
cdef int find_nnz_naive(DTYPE_float_t[::1, :] matrix) nogil:
cdef int n, m, i, j, k, nza, nelem, next_k
cdef float val1
m = matrix.shape[0]
n = matrix.shape[1]
nelem = min(m*n, MAXNNZ)
col_starts = <int*> malloc(sizeof(int)*nelem)
row_indices = <int*> malloc(sizeof(int)*nelem)
x = <double*> malloc(sizeof(double)*nelem)
nza = 0
for i in range(n):
for j in range(m):
val1 = matrix[j, i]
if val1 != 0.0:
nza = nza + 1
# Store i, j and val1 indices/values in 3 arrays
# x has to be an array of doubles
col_starts[nza] = i + 1
row_indices[nza] = j + 1
x[nza] = val1
free(x)
free(row_indices)
free(col_starts)
return nza
cdef int find_nnz_memcmp(DTYPE_float_t[::1, :] matrix) nogil:
cdef int n, m, i, j, k, nza, nelem, next_k
cdef int mcpi
cdef float val1
m = matrix.shape[0]
n = matrix.shape[1]
cdef int bytes_per_float = 4
cdef int n_floats_per_block = 64 if m > 512 else 32
cdef int comp_bytes = n_floats_per_block * bytes_per_float
test_block = <float*> calloc(n_floats_per_block, sizeof(float))
nelem = min(m*n, MAXNNZ)
col_starts = <int*> malloc(sizeof(int)*nelem)
row_indices = <int*> malloc(sizeof(int)*nelem)
x = <double*> malloc(sizeof(double)*nelem)
nza = 0
for i in range(n):
k = 0
while k < m:
mcpi = memcmp(&test_block[0], &matrix[k, i], comp_bytes)
next_k = min(m, k + n_floats_per_block)
if mcpi == 0:
k = next_k
continue
for j in range(k, next_k):
val1 = matrix[j, i]
if val1 != 0.0:
nza = nza + 1
# Store i, j and val1 indices/values in 3 arrays
# x has to be an array of doubles
col_starts[nza] = i + 1
row_indices[nza] = j + 1
x[nza] = val1
k = next_k
free(x)
free(row_indices)
free(col_starts)
free(test_block)
return nza
cdef int find_nnz_blocks(DTYPE_float_t[::1, :] matrix) nogil:
cdef int n, m, i, j, k, nza, nelem, next_k
cdef int mcpi, reminder, comp_bytes
cdef float val1
m = matrix.shape[0]
n = matrix.shape[1]
nelem = min(m*n, MAXNNZ)
col_starts = <int*> malloc(sizeof(int)*nelem)
row_indices = <int*> malloc(sizeof(int)*nelem)
x = <double*> malloc(sizeof(double)*nelem)
cdef int bytes_per_float = 4
cdef int n_floats_per_block = 64 if m > 512 else 32
cdef int comp_bytes_default = n_floats_per_block * bytes_per_float
reminder = 4*(m % n_floats_per_block)
nza = 0
for i in range(n):
k = 0
while k < m:
comp_bytes = comp_bytes_default
next_k = k + n_floats_per_block
if next_k >= m:
comp_bytes = reminder
next_k = m
mcpi = dataisnull(&matrix[k, i], comp_bytes)
if mcpi != 0:
for j in range(k + mcpi - 1, next_k):
val1 = matrix[j, i]
if val1 != 0.0:
nza = nza + 1
# Store i, j and val1 indices/values in 3 arrays
# x has to be an array of doubles
col_starts[nza] = i + 1
row_indices[nza] = j + 1
x[nza] = val1
k = next_k
free(x)
free(row_indices)
free(col_starts)
return nza
cpdef int find_nnz(DTYPE_float_t[::1, :] matrix, int method):
with nogil:
if method == 0:
return find_nnz_naive(matrix)
elif method == 1:
return find_nnz_memcmp(matrix)
elif method == 2:
return find_nnz_blocks(matrix)
Run Code Online (Sandbox Code Playgroud)
setup.py(将 Cython 编译成 pyd 的 Python 文件)
import os
from setuptools import setup
from setuptools import Extension
from Cython.Build import cythonize
from Cython.Distutils import build_ext
import numpy as np
# ==================================================================================================================== #
# C extensions
# ==================================================================================================================== #
EXTRA_COMPILE_ARGS = ['-O3', '-ffast-math', '-funroll-loops', '-flto', '-ftree-vectorize', '-DMS_WIN64']
EXTRA_LINK_ARGS = ['-flto', '-static'] + EXTRA_COMPILE_ARGS
class CustomBuildExt(build_ext):
def build_extensions(self):
# Override the compiler executables. Importantly, this
# removes the "default" compiler flags that would
# otherwise get passed on to to the compiler, i.e.,
# distutils.sysconfig.get_var("CFLAGS").
self.compiler.set_executable("compiler_so", "gcc -mdll -O -Wall -DMS_WIN64")
self.compiler.set_executable("compiler_cxx", "g++ -O -Wall -DMS_WIN64")
self.compiler.set_executable("linker_so", "gcc -shared -static")
self.compiler.dll_libraries = []
build_ext.build_extensions(self)
def Compile():
extension_kwargs = {'extra_compile_args': EXTRA_COMPILE_ARGS,
'extra_link_args' : EXTRA_LINK_ARGS,
'include_dirs' : [np.get_include()],
'define_macros' : [('WIN32', 1)]}
module_name = 'nonzero'
extension = Extension(module_name, [module_name + '.pyx'], **extension_kwargs)
# build the core extension(s)
setup_kwargs = {'cmdclass': {'build_ext': CustomBuildExt},
'ext_modules': cythonize(extension,
compiler_directives={'embedsignature' : False,
'boundscheck' : False,
'wraparound' : False,
'initializedcheck': False,
'cdivision' : True,
'nonecheck' : False,
'language_level' : '3str'},
force=True,
cache=False,
quiet=False)}
setup(**setup_kwargs)
if __name__ == '__main__':
Compile()
Run Code Online (Sandbox Code Playgroud)
main.py(Python 测试文件,用于运行计时和测试)
import numpy as np
import timeit
from nonzero import find_nnz
def create_matrix(m, n, density):
"""
Creates a `m` x `n` single-precision, fortran-ordered dense matrix
with a density of nonzero elements specified by `density` (as percentage)
"""
n_elem = m * n
# Number of non zero values
size = int(round(density / 100.0 * n_elem))
raveled_ind = np.random.choice(n_elem, size=size, replace=False)
i_ind, j_ind = np.unravel_index(raveled_ind, shape=(m, n))
matrix = np.zeros((m, n), dtype=np.float32, order='F')
matrix[i_ind, j_ind] = 1e3 * np.random.randn(size)
return matrix
N_TRIALS = 1000
NAIVE = 0
MEMCMP = 1
BLOCKS = 2
METHODS_STRINGS = ['Naive', 'Memcmp', 'Blocks']
METHODS_ID = [NAIVE, MEMCMP, BLOCKS]
def time_nonzero(m, n, density):
matrix = create_matrix(m, n, density)
assert matrix.dtype == np.float32, 'Single precision floats only'
assert np.isfortran(matrix), 'Matrix is not Fortran ordered'
nnz = np.count_nonzero(matrix)
print('%-6d %-6d %-6d %-8.4f' % (m, n, nnz, density), end=' ')
nnz_naive = find_nnz(matrix, NAIVE)
nnz_memcmp = find_nnz(matrix, MEMCMP)
nnz_blocks = find_nnz(matrix, BLOCKS)
assert nnz_naive == nnz_memcmp, 'NNZ (naive) = %d, NNZ (memcmp) = %d' % (nnz_naive, nnz_memcmp)
assert nnz_naive == nnz_blocks, 'NNZ (naive) = %d, NNZ (blocks) = %d' % (nnz_naive, nnz_blocks)
glb = {'constraint_matrix': matrix}
for method_id, method_name in zip(METHODS_ID, METHODS_STRINGS):
elapsed = timeit.timeit("find_nnz(constraint_matrix, %d)" % method_id,
globals=glb,
setup="from __main__ import find_nnz", number=N_TRIALS)
elapsed = elapsed * 1e3 / N_TRIALS
print('%-7.3f' % elapsed, end=' ')
print()
# ------------------------------------------- #
# Test cases with various m, n and densities
# M N Density (%)
TEST_CASES = [[1694, 2684, 0.1262],
[6295, 6955, 0.0281],
[4126, 5335, 0.0386],
[625 , 860 , 0.2366],
[491 , 667 , 0.2931],
[478 , 680 , 0.3295],
[780 , 1545, 0.2254],
[1012, 756 , 0.2189],
[1333, 1724, 0.1312],
[1699, 4021, 0.0883],
[3248, 3677, 0.0598],
[195 , 588 , 1.2245],
[1915, 3013, 0.0935],
[546 , 1475, 0.2297]]
# ------------------------------------------- #
def main():
methods_names = ''.join(['%-8s' % name for name in METHODS_STRINGS])
print('M N NNZ Density ' + methods_names)
for m, n, density in TEST_CASES:
time_nonzero(m, n, density)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
编译 Cython 文件的命令:
python setup.py build_ext --inplace --compiler=mingw32 --verbose --force
Run Code Online (Sandbox Code Playgroud)
运行上面的基准测试(在我的机器上大约需要 2 分钟),我得到这个(时间以毫秒为单位,Skylake AVX512):
正如您所看到的,“Memcmp”方法比“Naive”方法快约 20%,而“Blocks”方法比“Naive”方法快 30% 以上(至少对于较大的矩阵而言)。
由于一些晦涩的(对我来说)原因,如果我在所有 3 种方法中删除/注释掉这 3 行:
col_starts[nza] = i + 1
row_indices[nza] = j + 1
x[nza] = val1
Run Code Online (Sandbox Code Playgroud)
但是留在 中nza = nza + 1,那么“Naive”方法是最快的(30% 或更多)(!!!)。
无论如何,我们非常欢迎您提出任何让我的不太好的代码运行得更快的建议。
这是一些(仍然几乎没有测试过)纯 C 代码,它将列主密集矩阵转换为 row/col-idx 和非零值。
这是一个改进版本,基于 @PeterCordes 的建议,主要使用 256 位向量,特别是对于早期分支。
非零压缩使用pext/ pdepnow,受到这个问题的启发:AVX2 基于掩码打包 left 的最有效方法是什么?-- 我用来vpmovmskb保存乘法。
#include <immintrin.h>
#include <inttypes.h>
size_t extractNonZeros(float *out, uint32_t * rowidx, __m256 x, uint32_t mask, uint32_t start_row)
{
// mask has 4 bits set for each valid element of x
// compress corresponding index-nibbles:
uint64_t indexes = _pext_u64(0x76543210, mask);
// expand nibbles to bytes:
indexes = _pdep_u64(indexes, 0x0f0f0f0f0f0f0f0f);
// expand bytes to 32bits:
__m256i perm = _mm256_cvtepu8_epi32(_mm_cvtsi64_si128(indexes));
// extract corresponding values from x vector
x = _mm256_permutevar8x32_ps(x, perm);
// store 8 elements -- usually also writes some garbage, which will be overwritten by the next extraction
_mm256_storeu_ps(out, x);
// add the current startindex to perm to get the inner indexes
perm = _mm256_add_epi32(perm, _mm256_set1_epi32(start_row));
_mm256_storeu_si256((__m256i_u*) rowidx, perm);
return _mm_popcnt_u32(mask)/4;
}
size_t dense_to_sparse(float* nonzeros, uint32_t *colidx, uint32_t *rowidx, const float *matrix, size_t rows, size_t cols)
{
if(rows == 0) return 0;
float *out = nonzeros;
const size_t first_rows = ((rows + 7) & 7) +1;
const uint32_t first_rows_mask = (1ULL << (4*first_rows)) - 1;
for(size_t c=0; c<cols; ++c) {
const float *colstart = matrix + rows*c;
float *out_start = out; // to count nnz of current column
{
__m256 x = _mm256_loadu_ps(colstart);
uint32_t mask = first_rows_mask & _mm256_movemask_epi8(_mm256_castps_si256(_mm256_cmp_ps(_mm256_setzero_ps(), x, _CMP_NEQ_UQ)));
if(mask != 0)
{
size_t NNZ = extractNonZeros(out, rowidx, x, mask, 0);
out += NNZ;
rowidx += NNZ;
}
}
for(size_t r=first_rows; r<rows; r+=8) {
__m256 x = _mm256_loadu_ps(colstart + r);
uint32_t mask = _mm256_movemask_epi8(_mm256_castps_si256(_mm256_cmp_ps(_mm256_setzero_ps(), x, _CMP_NEQ_UQ)));
if(mask == 0) continue;
size_t NNZ = extractNonZeros(out, rowidx, x, mask, r);
out += NNZ;
rowidx += NNZ;
}
// You can manually vectorize this to write blocks of 8 values
for(int r=0; r<(out - out_start); ++r)
{
*colidx++ = c;
}
}
return out - nonzeros; // total number of non-zeros
}
Run Code Online (Sandbox Code Playgroud)
该代码现在还可以处理奇数行大小。通过更多的努力,当前列的最后一行可以与下一列的第一行合并。如果您的输入只有几行,也许这值得优化。
调用此方法的方法需要确保
nonzeros、colidx、 和rowidx可以存储 NNZ+7 个元素(结束后最多会有 7 个无效输出)——但是rows * cols元素总是足够的(假设rows % 8 == 0)。
godbolt-link 与简单的测试用例:https ://godbolt.org/z/Po7YjjEqW
旧版本使用基于 LUT 的压缩处理 128 位向量以供参考: https: //godbolt.org/z/xW7EvEhzs