使用python多处理管道

Mat*_*erg 12 python pipe python-2.6 multiprocessing

我正在尝试编写一个使用多个进程计算校验和的类,从而利用多个核心.我有一个非常简单的类,它在执行一个简单的案例时很有用.但每当我创建两个或更多类的实例时,工作者永远不会退出.好像从来没有得到父管道关闭管道的消息.

所有代码都可以在下面找到.我首先分别计算md5和sha1校验和,这是有效的,然后我尝试并行执行计算,然后程序在关闭管道时锁定.

这里发生了什么?管道为什么不像我预期的那样工作?我想我可以通过在队列上发送"停止"消息并让孩子退​​出这种方式来解决这个问题,但我真的很想知道为什么它不能正常工作.

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()
Run Code Online (Sandbox Code Playgroud)

PS.这个问题已经解决如果有人感兴趣,这是上述代码的工作版本:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()
Run Code Online (Sandbox Code Playgroud)

Max*_*kin 7

是的,这确实是令人惊讶的行为.

但是,如果查看lsof两个并行子进程的输出,则很容易注意到第二个子进程打开了更多文件描述符.

发生的情况是,当两个并行子进程启动时,第二个子进程继承父进程的管道,这样当父进程调用self.parent_conn.close()第二个子进程时仍然打开该管道文件描述符,这样管道文件描述就不会被关闭内核(引用计数大于0),其效果是self.child_conn.recv_bytes()在第一个并行子进程中从不read()s EOF并且EOFError从不抛出.

您可能需要发送显式关闭消息,而不是关闭文件描述符,因为似乎很少控制哪些文件描述符在哪些进程之间共享(没有close-on-fork文件描述符标志).