use*_*156 11 python multiprocessing python-multiprocessing
在使用Python multiprocessing模块时,我几乎没有基本问题:
class Someparallelworkerclass(object) :
def __init__(self):
self.num_workers = 4
self.work_queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.JoinableQueue()
def someparallellazymethod(self):
p = multiprocessing.Process(target=self.worktobedone).start()
def worktobedone(self):
# get data from work_queue
# put back result in result queue
Run Code Online (Sandbox Code Playgroud)
是否有必要通过work_queue,并result_queue作为args对Process?答案取决于操作系统吗?更基本的问题是:子进程是否从父进程获得复制(COW)地址空间,因此知道类/类方法的定义?如果是的话,它是如何知道的队列是IPC要共享,而且它不应该做的副本work_queue,并result_queue在子进程?我尝试在线搜索,但我发现的大部分文档都很模糊,并没有详细说明底层究竟发生了什么.
args在这种情况下,无论您使用什么平台,都没有必要在参数中包含队列.原因是即使你看起来并没有明确地将这两个JoinableQueue实例传递给孩子,你实际上是 - 通过self.由于self 被明确地传递给孩子,两个队列的一部分self,他们最终一起给孩子传递.
在Linux上,这发生在os.fork(),这意味着由内部用于进程间通信的multiprocessing.connection.Connection对象使用的文件描述符由子Queue进程继承(未复制).其他部分Queue变成了copy-on-write,但没关系; multiprocessing.Queue的设计使得任何需要复制的部分都不需要在两个进程之间保持同步.实际上,许多内部属性在fork发生后会重置:
def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send = self._writer.send # _writer is a
self._recv = self._reader.recv
self._poll = self._reader.poll
Run Code Online (Sandbox Code Playgroud)
所以这涵盖了Linux.Windows怎么样?Windows没有fork,所以它需要pickle self将它发送给孩子,包括腌制我们的Queues.现在,通常如果你试图挑选一个multiprocessing.Queue,它就会失败:
>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
dict = getstate()
File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
但这实际上是一种人为限制.multiprocessing.Queue对象可以是怎么回事,他们可能会在Windows发送到子进程-在某些情况下腌制?事实上,如果我们看一下实现,我们可以看到:
def __getstate__(self):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
Run Code Online (Sandbox Code Playgroud)
__getstate__,当腌制一个实例时assert_spawning调用,它有一个调用,这确保我们实际上在尝试pickle*时产生一个进程.__setstate__,在unpickling时调用,负责调用_after_fork.
那么Connection当我们必须腌制时,队列使用的对象如何保持?事实证明,有一个multiprocessing子模块可以做到这一点 - multiprocessing.reduction.该模块顶部的评论非常明确地说明了这一点:
#
# Module to allow connection and socket objects to be transferred
# between processes
#
Run Code Online (Sandbox Code Playgroud)
在Windows上,模块最终使用Windows提供的DuplicateHandle API来创建子进程Connection对象可以使用的重复句柄.因此,虽然每个进程都有自己的句柄,但它们都是完全相同的 - 在一个进程上执行的任何操作都反映在另一个进程上:
重复句柄引用与原始句柄相同的对象.因此,对象的任何更改都通过两个句柄反映出来.例如,如果复制文件句柄,则两个句柄的当前文件位置始终相同.
*有关详细信息,请参阅此答案assert_spawning
Mat*_*att -1
子进程没有获得复制的地址空间。子进程是一个完全独立的 python 进程,没有任何共享。是的,您必须将队列传递给孩子。当您这样做时,多处理会自动通过 IPC 处理共享。请参阅https://docs.python.org/2/library/multiprocessing.html#exchang-objects- Between-processes 。