如何在 Python 线程池中使用初始化器

Ale*_*ten 5 python multithreading convolution fftw

我正在尝试使用 PyFFTW 进行线程卷积,以便同时计算大量的 2D 卷积。(不需要单独的进程,因为 GIL 是为 Numpy 操作而释放的)。现在这是这样做的规范模型: http ://code.activestate.com/recipes/577187-python-thread-pool/

(Py)FFTW 之所以如此快,是因为它重用了计划。必须为每个线程单独设置这些以避免访问冲突错误,如下所示:

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True

        # Make separate fftw plans for each thread.
        flag_for_fftw='patient'      
        self.inputa = np.zeros(someshape, dtype='float32')
        self.outputa = np.zeros(someshape_semi, dtype='complex64')

        # create a forward plan.
        self.fft = fftw3.Plan(self.inputa,self.outputa, direction='forward', flags=[flag_for_fftw],nthreads=1)         

        # Initialize the arrays for the inverse fft.
        self.inputb = np.zeros(someshape_semi, dtype='complex64')
        self.outputb = np.zeros(someshape, dtype='float32')

        # Create the backward plan.
        self.ifft = fftw3.Plan(self.inputb,self.outputb, direction='backward', flags=[flag_for_fftw],nthreads=1)               
        self.start() 
Run Code Online (Sandbox Code Playgroud)

通过这种方式,我们可以将参数self.inputa, self.outputa, self.fft, self.inputb, self.outputb,传递self.ifft给 Worker 类中 run 方法中的实际卷积器。

这一切都很好,但我们不妨导入 ThreadPool 类:

from multiprocessing.pool import ThreadPool
Run Code Online (Sandbox Code Playgroud)

但是我应该如何在 ThreadPool 中定义初始值设定项才能获得相同的结果?根据文档 http://docs.python.org/library/multiprocessing.html “每个工作进程在启动时都会调用initializer(*initargs)”。您可以在 Python 源代码中轻松检查这一点。

但是,当您设置线程池时,例如使用 2 个线程:

po = ThreadPool(2,initializer=tobedetermined)
Run Code Online (Sandbox Code Playgroud)

然后你运行它,也许在某个循环中

po.apply_async(convolver,(some_input,))
Run Code Online (Sandbox Code Playgroud)

如何让卷积器由初始化器设置?如何让它在每个线程中使用单独的 FFTW 计划,而不需要为每个卷积重新计算 FFTW 计划?

干杯,亚历克斯。

Mih*_*tan 1

您可以使用一个函数来包装卷积器调用,该函数使用线程本地存储(threading.local())来初始化 PyFFTW 并记住结果