什么可以使connection.send()块?(来自conn1,conn2 = multiprocessing.Pipe())

LBa*_*ret 8 python pipe multiprocessing

我正在调试从2个传感器收集信息的应用程序:网络摄像头和麦克风.

一般架构非常简单:

  • 主进程通过管道将消息(start,stop,get_data)发送到子进程(每个进程一个).
  • 子进程收集数据并将其发送到主进程

子进程和主进程处于无限循环中以处理命令(来自用户的主进程,来自主进程的子进程).

它在全球范围内有效,但我无法阻止子进程.

我已经记录了代码,它似乎发生了两件事:

  1. 发送'stop'消息但是没有通过管道.
  2. 子进程继续发送数据和conn.send(数据)块.

该行为显然与连接状态相关联,因为没有发送任何内容的子进程没有此行为.不过,我还没有看到如何调试/修改当前可能合理的架构.

那么,是什么导致这种阻塞行为以及如何避免它呢?

这是为子进程中的无限循环的每次迭代执行的代码:

    def do(self):
        while self.cnx.poll():
            msg = self.cnx.recv()
            self.queue.append(msg)
        #==
        if not self.queue:
            func_name = 'default_action'
            self.queue.append([func_name, ])
        #==
        msg             = self.queue.pop()
        func_name, args = msg[0], msg[1:]
        #==
        res = self.target.__getattribute__(func_name)(*args)
        #==
        running = func_name != 'stop'
        #==
        if res and self.send:
            assert running
            self.output_queue.append(res[0])
        if self.output_queue and running:
            self.cnx.send(self.output_queue.popleft())
        #==
        return running
Run Code Online (Sandbox Code Playgroud)

更新:似乎管道不能同时写入两端.如果将上面代码的最后几行更改为:

        if self.output_queue and running:
            if not self.cnx.poll(): 
                self.cnx.send(self.output_queue.popleft())
Run Code Online (Sandbox Code Playgroud)

问题仍然存在,但默认情况下管道被记录为全双工,并且根本没有记录此行为.我一定是误解了一些东西.拜托我吧!

更新2:为了清楚起见,在这种情况下没有关闭连接.描述事件的顺序:

  • 主进程发送一个消息("停止")(它在发送消息之前清空连接)
  • 主进程进入一个(无限)循环,在子进程终止时停止.
  • 同时,子进程在发送中被阻止,永远不会收到消息.

Ben*_*son 5

全双工multiprocessing.Pipe实现为socketpair(). .send与套接字通信时,调用可能会因所有正常原因而阻塞。根据您的描述,我认为您的读者可能Pipe已经停止阅读,并且数据已经在内核的缓冲区中累积到您的阻塞点.send

如果您明确指定接收方,当您尝试时,.close您可能会收到某种错误(尽管也可能,但不确定) 。如果您的接收连接超出范围,这可能会自动发生。您可以通过更加小心地不要存储对接收方的引用(直接或间接)来解决问题,这样当该线程消失时它就会被释放。SIGPIPE.send

阻塞的简单演示.send

import multiprocessing

a, b = multiprocessing.Pipe()

while True:
    print "send!"
    a.send("hello world")
Run Code Online (Sandbox Code Playgroud)

现在请注意,一段时间后它停止打印“发送!”