Jul*_*eil 0 python queue multithreading multiprocessing
我一整天都被这个问题困扰,并且无法找到与我想要完成的任务相关的任何解决方案。
我试图将队列传递给子进程中生成的线程。队列在入口文件中创建并作为参数传递给每个子流程。
我正在制作一个模块化程序来a)运行神经网络b)在需要时自动更新网络模型c)将事件/图像从神经网络记录到服务器。我以前的程序只崇拜一个运行多个线程的 CPU 核心,并且速度变得相当慢,因此我决定需要对程序的某些部分进行子处理,以便它们可以在自己的内存空间中运行,以充分发挥其潜力。
子流程:
总共 4 个子流程。
在开发过程中,我需要在每个进程之间进行通信,以便它们都位于同一页面上,并包含来自服务器的事件等。所以据我所知,队列将是最好的选择。
(澄清:来自“多处理”模块的“队列”,而不是“队列”模块)
~~不过~~
每个子进程都会产生自己的线程。例如,第一个子进程将产生多个线程:每个队列一个线程,用于侦听来自不同服务器的事件并将它们传递到程序的不同区域;一个线程监听从神经网络之一接收图像的队列;一个线程监听从网络摄像头接收实时图像的队列;一个线程监听从另一个神经网络接收输出的队列。
我可以毫无问题地将队列传递给子流程,并且可以有效地使用它们。但是,当我尝试将它们传递给每个子进程中的线程时,我收到上述错误。
我对多重处理相当陌生;然而,除了共享内存空间和 GIL 之外,其背后的方法看起来与线程相对相同。
这是来自 Main.py;程序入口。
from lib.client import Client, Image
from multiprocessing import Queue, Process
class Main():
def __init__(self, server):
self.KILLQ = Queue()
self.CAMERAQ = Queue()
self.CLIENT = Client((server, 2005), self.KILLQ, self.CAMERAQ)
self.CLIENT_PROCESS = Process(target=self.CLIENT.do, daemon=True)
self.CLIENT_PROCESS.start()
if __name__ == '__main__':
m = Main('127.0.0.1')
while True:
m.KILLQ.put("Hello world")
Run Code Online (Sandbox Code Playgroud)
这是来自 client.py (在名为 lib 的文件夹中)
class Client():
def __init__(self, connection, killq, cameraq):
self.TCP_IP = connection[0]
self.TCP_PORT = connection[1]
self.CAMERAQ = cameraq
self.KILLQ = killq
self.BUFFERSIZE = 1024
self.HOSTNAME = socket.gethostname()
self.ATTEMPTS = 0
self.SHUTDOWN = False
self.START_CONNECTION = MakeConnection((self.TCP_IP, self.TCP_PORT))
# self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
# self.KILLQ_THREAD.start()
def do(self):
# The function ran as the subprocess from Main.py
print(self.KILLQ.get())
def _listen(self, q):
# This is threaded multiple times listening to each Queue (as 'q' that is passed when the thread is created)
while True:
print(self.q.get())
Run Code Online (Sandbox Code Playgroud)
# self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
Run Code Online (Sandbox Code Playgroud)
这是抛出错误的地方。如果我将此行注释掉,程序就会正常运行。我可以毫无问题地从该子进程中的队列中读取(即函数“do”),而不是在此子进程下的线程中(即函数“_listen”)。
我需要能够跨每个进程进行通信,以便它们可以与主程序同步(即在神经网络模型更新的情况下,推理子进程需要关闭,以便可以更新模型而不会导致错误) )。
任何对此的帮助都会很棒!
我也对其他同样有效的沟通方式持开放态度。如果您认为更好的沟通流程会起作用;它需要足够快才能支持从摄像机发送到服务器的 4k 图像的实时流传输。
非常感谢您的宝贵时间!:)
队列不是问题。包中的内容multiprocessing被设计为可挑选的,以便它们可以在进程之间共享。
问题是,您的线程KILLQ_THREAD是在主进程中创建的。线程不能在进程之间共享。事实上,当一个进程按照 POSIX 标准分叉时,父进程中活动的线程不是克隆到新子进程内存空间的进程映像的一部分。原因之一是调用时互斥体的状态fork()可能会导致子进程中的死锁。
您必须将线程的创建移至子进程,即
def do(self):
self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
self.KILLQ_THREAD.start()
Run Code Online (Sandbox Code Playgroud)
据推测,KILLQ应该向子进程发出关闭信号。在这种情况下,特别是如果您计划使用多个子进程,队列并不是实现这一目标的最佳方法。由于从队列中Queue.get()删除该项目,因此每个项目只能由一个消费者检索和处理。您的生产者必须将多个关闭信号放入队列中。在多消费者场景中,您也没有合理的方法来确保特定消费者收到任何特定项目。放入队列中的任何项目都可能被从中读取的任何消费者检索。Queue.get_nowait()
对于信号发送,特别是对于多个接收者,最好使用Event
您还会注意到,您的程序在启动后似乎很快挂起。那是因为你启动了你的子进程和线程daemon=True。
当您的Client.do()方法如上所示时,即创建并启动线程,然后退出,您的子进程在调用后立即结束,self.KILLQ_THREAD.start()并且守护线程立即随之结束。您的主进程没有注意到任何事情,并继续将Hello world放入队列中,直到队列最终填满并queue.Full引发。
下面是一个Event在两个子进程中使用 for shutdown 信号的精简代码示例,每个子进程各有一个线程。
主要.py
import time
from lib.client import Client
from multiprocessing import Process, Event
class Main:
def __init__(self):
self.KILLQ = Event()
self._clients = (Client(self.KILLQ), Client(self.KILLQ))
self._procs = [Process(target=cl.do, daemon=True) for cl in self._clients]
[proc.start() for proc in self._procs]
if __name__ == '__main__':
m = Main()
# do sth. else
time.sleep(1)
# signal for shutdown
m.KILLQ.set()
# grace period for both shutdown prints to show
time.sleep(.1)
Run Code Online (Sandbox Code Playgroud)
客户端.py
import multiprocessing
from threading import Thread
class Client:
def __init__(self, killq):
self.KILLQ = killq
def do(self):
# non-daemonic thread! We want the process to stick around until the thread
# terminates on the signal set by the main process
self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,))
self.KILLQ_THREAD.start()
@staticmethod
def _listen(q):
while not q.is_set():
print("in thread {}".format(multiprocessing.current_process().name))
print("{} - master signalled shutdown".format(multiprocessing.current_process().name))
Run Code Online (Sandbox Code Playgroud)
输出
[...]
in thread Process-2
in thread Process-1
in thread Process-2
Process-2 - master signalled shutdown
in thread Process-1
Process-1 - master signalled shutdown
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)
至于进程间通信的方法,您可能需要研究流服务器解决方案。 Miguel Grinberg早在 2014 年就用 Flask编写了一篇关于视频流的优秀教程,并于 2017 年 8 月发布了最新的后续教程。