LBa*_*ret 8 python pipe multiprocessing
我正在调试从2个传感器收集信息的应用程序:网络摄像头和麦克风.
一般架构非常简单:
子进程和主进程处于无限循环中以处理命令(来自用户的主进程,来自主进程的子进程).
它在全球范围内有效,但我无法阻止子进程.
我已经记录了代码,它似乎发生了两件事:
该行为显然与连接状态相关联,因为没有发送任何内容的子进程没有此行为.不过,我还没有看到如何调试/修改当前可能合理的架构.
那么,是什么导致这种阻塞行为以及如何避免它呢?
这是为子进程中的无限循环的每次迭代执行的代码:
def do(self):
while self.cnx.poll():
msg = self.cnx.recv()
self.queue.append(msg)
#==
if not self.queue:
func_name = 'default_action'
self.queue.append([func_name, ])
#==
msg = self.queue.pop()
func_name, args = msg[0], msg[1:]
#==
res = self.target.__getattribute__(func_name)(*args)
#==
running = func_name != 'stop'
#==
if res and self.send:
assert running
self.output_queue.append(res[0])
if self.output_queue and running:
self.cnx.send(self.output_queue.popleft())
#==
return running
Run Code Online (Sandbox Code Playgroud)
更新:似乎管道不能同时写入两端.如果将上面代码的最后几行更改为:
if self.output_queue and running:
if not self.cnx.poll():
self.cnx.send(self.output_queue.popleft())
Run Code Online (Sandbox Code Playgroud)
问题仍然存在,但默认情况下管道被记录为全双工,并且根本没有记录此行为.我一定是误解了一些东西.拜托我吧!
更新2:为了清楚起见,在这种情况下没有关闭连接.描述事件的顺序:
全双工multiprocessing.Pipe实现为socketpair(). .send与套接字通信时,调用可能会因所有正常原因而阻塞。根据您的描述,我认为您的读者可能Pipe已经停止阅读,并且数据已经在内核的缓冲区中累积到您的阻塞点.send。
如果您明确指定接收方,当您尝试时,.close您可能会收到某种错误(尽管也可能,但不确定) 。如果您的接收连接超出范围,这可能会自动发生。您可以通过更加小心地不要存储对接收方的引用(直接或间接)来解决问题,这样当该线程消失时它就会被释放。SIGPIPE.send
阻塞的简单演示.send:
import multiprocessing
a, b = multiprocessing.Pipe()
while True:
print "send!"
a.send("hello world")
Run Code Online (Sandbox Code Playgroud)
现在请注意,一段时间后它停止打印“发送!”
| 归档时间: |
|
| 查看次数: |
1998 次 |
| 最近记录: |