cython.parallel:如何初始化线程局部的ndarray缓冲区?

ARF*_*ARF 7 multithreading numpy openmp cython

我正在努力初始化线程局部的ndarrays cython.parallel:

伪代码:

cdef:
    ndarray buffer

with nogil, parallel():
    buffer = np.empty(...)

    for i in prange(n):
        with gil:
            print "Thread %d: data address: 0x%x" % (threadid(), <uintptr_t>buffer.data)

        some_func(buffer.data)  # use thread-local buffer

cdef void some_func(char * buffer_ptr) nogil:
    (... works on buffer contents...)
Run Code Online (Sandbox Code Playgroud)

我的问题是,所有线程都buffer.data指向同一个地址.即最后分配的线程的地址buffer.

尽管bufferparallel()(或者prange)块中被赋值,但是cython不会创建buffer一个private或线程局部变量,而是将其保存为shared变量.

结果,buffer.data指向相同的内存区域对我的算法造成了严重破坏.

这不仅仅是ndarray对象的问题,而是看似所有已cdef class定义的对象.

我该如何解决这个问题?

Dav*_*vid 5

我想我终于找到了解决这个我喜欢的问题的方法。简短的版本是您创建一个具有形状的数组:

(number_of_threads, ...<whatever shape you need in the thread>...) 然后,调用openmp.omp_get_thread_num并将其用于索引数组以获得“线程本地”子数组。这样可以避免为每个循环索引使用单独的数组(这可能是巨大的),而且还可以防止线程相互覆盖。

这是我所做的工作的粗略版本:

import numpy as np
import multiprocessing

from cython.parallel cimport parallel
from cython.parallel import prange
cimport openmp

cdef extern from "stdlib.h":
    void free(void* ptr)
    void* malloc(size_t size)
    void* realloc(void* ptr, size_t size)

...

cdef int num_items = ...
num_threads = multiprocessing.cpu_count()
result_array = np.zeros((num_threads, num_items), dtype=DTYPE) # Make sure each thread uses separate memory
cdef c_numpy.ndarray result_cn
cdef CDTYPE ** result_pointer_arr
result_pointer_arr = <CDTYPE **> malloc(num_threads * sizeof(CDTYPE *))
for i in range(num_threads):
    result_cn = result_array[i]
    result_pointer_arr[i] = <CDTYPE*> result_cn.data

cdef int thread_number
for i in prange(num_items, nogil=True, chunksize=1, num_threads=num_threads, schedule='static'):
    thread_number = openmp.omp_get_thread_num()
    some_function(result_pointer_arr[thread_number])
Run Code Online (Sandbox Code Playgroud)