Python多重处理-无法加入当前线程

Jzl*_*325 5 python multithreading multiprocessing

我正在分割大型ctype数组并并行处理它们。我收到下面的错误,并相信它是因为数组的一个片段要先完成另一个片段的处理。我尝试使用process.join()来等待第一组进程,但是那没有用。有想法吗?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
Run Code Online (Sandbox Code Playgroud)

使用方法:

    ....

        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is now stored in a shared array destroy the array ref for memory reasons

            step = y // cores
            if step != 0:
                jobs =[]
                for i in range (0, y, step):
                    process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
                    jobs.append(process)
                    process.start()

                for j in jobs:
                    j.join()

    del jobs
    del process
Run Code Online (Sandbox Code Playgroud)

更新:

 #Create an ctypes array
        array = ArrayConvert.SharedMemArray(array)
        #Create a global of options
        init_options(options) #options is a dict
        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is not stored in a shared array destroy the array ref for memory reasons


            step = y // cores
            if step != 0:
                for i in range (0, y, step):
                    #Package all the options into a global dictionary

                    p.map_async(stretch,[slice(i, i+step)])

                    #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)

        p.join()        

def init_options(options_):
    global kwoptions
    kwoptions = options_
Run Code Online (Sandbox Code Playgroud)

我要传递给map_async的函数存储在另一个模块中,因此我很难将全局kwoptions传递给该函数。在这样的模块之间传递全局变量似乎是不正确的(unpythonic)。这是能够通过map_async传递kwarg的方法。

我应该使用其他方法(应用程序或处理程序)重新处理多处理吗?

jfs*_*jfs 1

initializerfor 的参数Pool()接受一个函数;initializer=init(array)用。。。来代替initializer=init, initargs=(array,)

f()要将关键字参数传递给与 family一起使用的函数,pool.*map*您可以创建一个包装器mp_f()

#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing

def init(shared_array_):
    # globals that should be available in worker processes should be
    # initialized here
    global shared_array
    shared_array = shared_array_

def f(interval, a=None, b=None):
    mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
    shared_array[interval] = [a + interval.start]*b # fake computations

def mp_f(arg_kwargs):
    try:
        arg, kwargs = arg_kwargs
        return f(arg, **kwargs) # pass keyword args to f()
    except Exception:
        mp.get_logger().error("f%r failed" % (arg_kwargs,))

def main():
    mp.log_to_stderr().setLevel(logging.INFO)

    N = 10**6
    array = mp.RawArray('i', N) # create shared array

    # create workers pool; use all available CPU cores
    with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
        options = dict(a=5, b=N//4) # dummy options
        step = options['b']
        args = ((slice(i, i+step), options) for i in range(0, N, step))
        for _ in p.imap_unordered(mp_f, args): # submit jobs
            pass
    p.join()
    mp.get_logger().info(array[::step])

if __name__=="__main__":
    mp.freeze_support() # for py2exe and the-like on Windows
    main()
Run Code Online (Sandbox Code Playgroud)