Mat*_*erg 12 python pipe python-2.6 multiprocessing
我正在尝试编写一个使用多个进程计算校验和的类,从而利用多个核心.我有一个非常简单的类,它在执行一个简单的案例时很有用.但每当我创建两个或更多类的实例时,工作者永远不会退出.好像从来没有得到父管道关闭管道的消息.
所有代码都可以在下面找到.我首先分别计算md5和sha1校验和,这是有效的,然后我尝试并行执行计算,然后程序在关闭管道时锁定.
这里发生了什么?管道为什么不像我预期的那样工作?我想我可以通过在队列上发送"停止"消息并让孩子退出这种方式来解决这个问题,但我真的很想知道为什么它不能正常工作.
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
Run Code Online (Sandbox Code Playgroud)
PS.这个问题已经解决如果有人感兴趣,这是上述代码的工作版本:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
Run Code Online (Sandbox Code Playgroud)
是的,这确实是令人惊讶的行为.
但是,如果查看lsof两个并行子进程的输出,则很容易注意到第二个子进程打开了更多文件描述符.
发生的情况是,当两个并行子进程启动时,第二个子进程继承父进程的管道,这样当父进程调用self.parent_conn.close()第二个子进程时仍然打开该管道文件描述符,这样管道文件描述就不会被关闭内核(引用计数大于0),其效果是self.child_conn.recv_bytes()在第一个并行子进程中从不read()s EOF并且EOFError从不抛出.
您可能需要发送显式关闭消息,而不是关闭文件描述符,因为似乎很少控制哪些文件描述符在哪些进程之间共享(没有close-on-fork文件描述符标志).
| 归档时间: |
|
| 查看次数: |
9977 次 |
| 最近记录: |