Boo*_*boo 6 python queue multiprocessing
相关问题出现在Why I can't use multiprocessing.Queue with ProcessPoolExecutor? 。我提供了部分答案以及解决方法,但承认这个问题引发了另一个问题,即为什么可以multiprocessing.Queue将实例作为参数传递给辅助函数。multiprocessing.Process
例如,以下代码在使用spawn或fork方法创建新进程的平台下会失败:
from multiprocessing import Pool, Queue
def worker(q):
print(q.get())
with Pool(1) as pool:
q = Queue()
q.put(7)
pool.apply(worker, args=(q,))
Run Code Online (Sandbox Code Playgroud)
上述提出:
RuntimeError: Queue objects should only be shared between processes through inheritance
但下面的程序运行没有问题:
from multiprocessing import Process, Queue
def worker(q):
print(q.get())
q = Queue()
q.put(7)
p = Process(target=worker, args=(q,))
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud)
看来,多处理池工作函数的参数最终会放在池的输入队列上,该队列是作为 a 实现的,multiprocessing.SimpleQueue并且您不能将实例放入使用 a进行序列化的实例中。multiprocessing.Queuemultiprocessing.SimpleQueueForkingPickler
那么,multiprocessing.Queue当作为参数传递给 a 时,它是如何序列化的,multiprocessing.Process从而允许以这种方式使用它呢?
我想扩展已接受的答案,因此我添加了自己的答案,其中还详细介绍了一种使队列、锁等可腌制并能够通过池发送的方法。
基本上,并不是说队列不能序列化,而是只有multiprocessing当队列知道要发送到的目标进程的足够信息(无论是当前进程还是其他进程)时才可以序列化这些队列,这就是它在以下情况下工作的原因:您自己生成一个进程(使用Process类),但当您只是将其放入队列时(例如使用 a 时Pool),则不会。
查看源代码multiprocessing.queues.Queue(或其他连接对象,如Condition)。您会发现在他们的__getstate__方法(当 Queue 实例被 pickle 时调用的方法)中,有一个对 function 的调用multiprocessing.context.assert_spawning。仅当当前线程正在生成进程时,此“断言”才会通过。如果情况并非如此,multiprocessing则会引发您看到的错误并退出。
现在,多处理甚至不费心去pickle队列以防断言失败的原因是它无法访问Popen线程创建子进程时创建的对象(对于Windows,您可以在 参考资料中找到它multiprocessing.popen_spawn_win32.Popen)。该对象存储有关目标进程的数据,包括其 pid 和进程句柄。多重处理需要此信息,因为队列包含互斥体,并且为了成功腌制并稍后再次重建这些互斥体,多重处理必须使用对象DuplicateHandle中的信息通过 winapi进行调用Popen。如果不存在此对象,多重处理将不知道该怎么做并引发错误。因此,这就是我们的问题所在,但如果我们能够教导多处理一种不同的方法来从目标进程本身内部窃取重复句柄,而无需提前获取其信息,那么这是可以解决的。
上课要专心听讲multiprocessing.synchronize.SemLock。它是所有多处理锁的基类,因此它的对象随后出现在队列、管道等中。当前腌制的方式就像我上面描述的那样,它需要目标进程的句柄来创建重复的句柄。但是,我们可以定义一个__reduce__方法,SemLock使用当前进程的句柄创建重复句柄,然后从目标进程复制先前创建的句柄,该句柄现在在目标进程的上下文中有效。这是相当拗口的,但实际上也使用类似的方法来腌制PipeConnection对象,但__reduce__它不是使用方法,而是使用调度表来执行此操作。
完成此操作后,我们可以子类化Queue并删除对的调用assert_spawning,因为不再需要它。这样,我们现在就能够成功地pickle锁、队列、管道等。下面是带有示例的代码:
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context
import multiprocessing.queues
import _winapi
def work(q):
print("Worker: Main says", q.get())
q.put('haha')
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
def reduce_lock_connection(self):
sl = self._semlock
dh = DupSemLockHandle(sl.handle)
return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))
def rebuild_lock_connection(dh, t, state):
handle = dh.detach() # Duplicated handle valid in current process's context
# Create a new instance without calling __init__ because we'll supply the state ourselves
lck = t.__new__(t)
lck.__setstate__((handle,)+state)
return lck
# Add our own reduce function to pickle SemLock and it's child classes
synchronize.SemLock.__reduce__ = reduce_lock_connection
class PicklableQueue(multiprocessing.queues.Queue):
"""
A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
"""
def __init__(self, *args, **kwargs):
ctx = get_context()
super().__init__(*args, **kwargs, ctx=ctx)
def __getstate__(self):
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def is_locked(l):
"""
Returns whether the given lock is acquired or not.
"""
locked = l.acquire(block=False)
if locked is False:
return True
else:
l.release()
return False
if __name__ == '__main__':
# Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
l1 = Lock()
p = pickle.dumps(l1)
l2 = pickle.loads(p)
print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
l2.acquire()
print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
# Example that shows how you can pass a queue to Pool and it will work
with Pool() as pool:
q = PicklableQueue()
q.put('laugh')
pool.map(work, (q,))
print("Main: Worker says", q.get())
Run Code Online (Sandbox Code Playgroud)
输出
before acquiring, l1 locked: False l2 locked False
after acquiring l1 locked: True l2 locked True
Worker: Main says laugh
Main: Worker says haha
Run Code Online (Sandbox Code Playgroud)
免责声明:以上代码仅适用于Windows。如果您使用的是 UNIX,那么您可以尝试使用下面@Booboo 的修改后的代码(报告有效,但尚未经过充分测试,完整代码链接在这里):
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context, Process
import multiprocessing.queues
import sys
_is_windows= sys.platform == 'win32'
if _is_windows:
import _winapi
.
.
.
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
if _is_windows:
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
else:
self._handle = handle
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
if not _is_windows:
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1387 次 |
| 最近记录: |