Jzl*_*325 5 python multithreading multiprocessing
我正在分割大型ctype数组并并行处理它们。我收到下面的错误,并相信它是因为数组的一个片段要先完成另一个片段的处理。我尝试使用process.join()来等待第一组进程,但是那没有用。有想法吗?
Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
Run Code Online (Sandbox Code Playgroud)
使用方法:
....
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is now stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
jobs =[]
for i in range (0, y, step):
process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
jobs.append(process)
process.start()
for j in jobs:
j.join()
del jobs
del process
Run Code Online (Sandbox Code Playgroud)
更新:
#Create an ctypes array
array = ArrayConvert.SharedMemArray(array)
#Create a global of options
init_options(options) #options is a dict
with closing(multiprocessing.Pool(initializer=init(array))) as p:
del array #Since the array is not stored in a shared array destroy the array ref for memory reasons
step = y // cores
if step != 0:
for i in range (0, y, step):
#Package all the options into a global dictionary
p.map_async(stretch,[slice(i, i+step)])
#p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)
p.join()
def init_options(options_):
global kwoptions
kwoptions = options_
Run Code Online (Sandbox Code Playgroud)
我要传递给map_async的函数存储在另一个模块中,因此我很难将全局kwoptions传递给该函数。在这样的模块之间传递全局变量似乎是不正确的(unpythonic)。这是能够通过map_async传递kwarg的方法。
我应该使用其他方法(应用程序或处理程序)重新处理多处理吗?
initializerfor 的参数Pool()接受一个函数;initializer=init(array)用。。。来代替initializer=init, initargs=(array,)
f()要将关键字参数传递给与 family一起使用的函数,pool.*map*您可以创建一个包装器mp_f():
#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing
def init(shared_array_):
# globals that should be available in worker processes should be
# initialized here
global shared_array
shared_array = shared_array_
def f(interval, a=None, b=None):
mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
shared_array[interval] = [a + interval.start]*b # fake computations
def mp_f(arg_kwargs):
try:
arg, kwargs = arg_kwargs
return f(arg, **kwargs) # pass keyword args to f()
except Exception:
mp.get_logger().error("f%r failed" % (arg_kwargs,))
def main():
mp.log_to_stderr().setLevel(logging.INFO)
N = 10**6
array = mp.RawArray('i', N) # create shared array
# create workers pool; use all available CPU cores
with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
options = dict(a=5, b=N//4) # dummy options
step = options['b']
args = ((slice(i, i+step), options) for i in range(0, N, step))
for _ in p.imap_unordered(mp_f, args): # submit jobs
pass
p.join()
mp.get_logger().info(array[::step])
if __name__=="__main__":
mp.freeze_support() # for py2exe and the-like on Windows
main()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8009 次 |
| 最近记录: |