it_*_*ure 6 python python-3.x python-multiprocessing
操作系统和Python信息:
uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
Run Code Online (Sandbox Code Playgroud)
这是一个可以启动多重处理的简单类。
from multiprocessing.pool import Pool
class my_mp(object):
def __init__(self):
self.process_num = 3
fh = open('test.txt', 'w')
def run_task(self,i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
def run(self):
pool = Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
初始化my_mp类,然后启动多进程。
ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
Run Code Online (Sandbox Code Playgroud)
现在替换fh = open('test.txt', 'w')为self.fh = open('test.txt', 'w')in my_mpclass 并重试。
ins = my_mp()
ins.run()
Run Code Online (Sandbox Code Playgroud)
没有输出!为什么进程没有启动?
>>> from multiprocessing.pool import Pool
>>>
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... self.fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... self.fh = fh
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>>
Run Code Online (Sandbox Code Playgroud)
self.fh为什么不能在方法中以 的形式添加文件处理程序__init__?我从未在__init__任何进程中调用过定义的文件处理程序。
Stdlib 多处理使用pickle来序列化对象。任何需要跨流程边界发送的内容都需要是可腌制的。
自定义类实例通常是可picklable的,只要它们的所有属性都是可picklable的——它的工作原理是在子流程中导入类型并取消picklable属性。
问题是返回的对象open()不可picklable。
>>> class A:
... pass
...
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
... def __init__(self):
... self.fh = open("test.txt", "w")
...
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object
Run Code Online (Sandbox Code Playgroud)
在第一种情况下,多处理池仍然可以工作,因为fh它只是一个局部变量,并且一旦超出范围(即方法__init__返回时)就会被删除。但是,一旦您使用 将此句柄保存到实例的命名空间中self.fh = open(...),就会保留一个引用,并且需要通过进程边界发送它。
self.run_task您可能认为,由于您只安排了在池中执行的方法,因此设置的状态__init__并不重要,但事实并非如此。还是有参考的:
>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
Run Code Online (Sandbox Code Playgroud)
请注意,调用在主进程中ins = my_mp()运行该方法,并且是通过进程边界发送的对象。__init__ins.run_task
有一个第三方库,它提供了 stdlib 多处理池的直接替代 -pip install pathos并将多处理导入替换为:
from pathos.multiprocessing import Pool
Run Code Online (Sandbox Code Playgroud)
pathos使用dill,这是一个比 pickle 更强大的序列化库,因此它能够序列化open(). 您的代码应该可以再次运行,无需进行任何其他更改。但是,您应该注意,每个工作进程不会知道其他进程向 写入字节self.fh,因此无论哪个工作进程最后写入都可能会覆盖之前从其他进程写入的数据。