如何使用Python多处理队列(通过PyOpenCL)访问GPU?

joh*_*_be 4 python queue multiprocessing pyopencl

我的代码需要很长时间才能运行,因此我一直在研究Python的多处理库以加快处理速度。我的代码还包含一些通过PyOpenCL利用GPU的步骤。问题是,如果我将多个进程设置为同时运行,则它们最终都会尝试同时使用GPU,这通常会导致一个或多个进程抛出异常并退出。

为了解决此问题,我错开了每个过程的开始,以使它们不太可能相互碰撞:

process_list = []
num_procs = 4

# break data into chunks so each process gets it's own chunk of the data
data_chunks = chunks(data,num_procs)
for chunk in data_chunks:
    if len(chunk) == 0:
        continue
    # Instantiates the process
    p = multiprocessing.Process(target=test, args=(arg1,arg2))
    # Sticks the thread in a list so that it remains accessible
    process_list.append(p)

# Start threads
j = 1
for process in process_list:
    print('\nStarting process %i' % j)
    process.start()
    time.sleep(5)
    j += 1

for process in process_list:
    process.join()
Run Code Online (Sandbox Code Playgroud)

我还在调用GPU的函数周围包装了一个try else循环,这样,如果两个进程确实尝试同时访问它,则没有访问权限的进程将等待几秒钟,然后重试:

wait = 2
n = 0
while True:
    try:
        gpu_out = GPU_Obj.GPU_fn(params)
    except:
        time.sleep(wait)
        print('\n Waiting for GPU memory...')
        n += 1
        if n == 5:
            raise Exception('Tried and failed %i times to allocate memory for opencl kernel.' % n)
        continue
    break
Run Code Online (Sandbox Code Playgroud)

这种解决方法非常笨拙,即使在大多数情况下都可以使用,但进程偶尔也会抛出异常,我觉得应该使用更有效/优雅的解决方案multiprocessing.queue或类似方法。但是,我不确定如何将其与PyOpenCL集成以进行GPU访问。

dan*_*ano 6

听起来您可以使用multiprocessing.Lock来同步对GPU的访问:

data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
    if len(chunk) == 0:
        continue
    # Instantiates the process
    p = multiprocessing.Process(target=test, args=(arg1,arg2, lock))
    ...
Run Code Online (Sandbox Code Playgroud)

然后,test在您访问GPU的内部:

with lock:  # Only one process will be allowed in this block at a time.
    gpu_out = GPU_Obj.GPU_fn(params)
Run Code Online (Sandbox Code Playgroud)

编辑:

为此,您需要执行以下操作:

# At global scope
lock = None

def init(_lock):
    global lock
    lock = _lock

data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
    if len(chunk) == 0:
        continue
    # Instantiates the process
    p = multiprocessing.Pool(initializer=init, initargs=(lock,))
    p.apply(test, args=(arg1, arg2))
    ...
Run Code Online (Sandbox Code Playgroud)

要么:

data_chunks = chunks(data,num_procs)
m = multiprocessing.Manager()
lock = m.Lock()
for chunk in data_chunks:
    if len(chunk) == 0:
        continue
    # Instantiates the process
    p = multiprocessing.Pool()
    p.apply(test, args=(arg1, arg2, lock))
Run Code Online (Sandbox Code Playgroud)