use*_*300 12 python queue multithreading multiprocessing python-2.7
编辑:我对视频流有什么疑问,所以我会提供更多的清晰度.该流是我网络摄像头的实时视频,通过OpenCV访问.我在摄像机读取每个帧时得到它,并将其发送到一个单独的进程进行处理.该过程基于对图像进行的计算返回文本.然后将文本显示在图像上.我需要实时显示流,如果文本和正在显示的视频之间存在延迟(即如果文本适用于前一帧,那就没问题),这是可以的.
也许更容易想到这一点的是我正在对网络摄像头看到的图像进行识别.我一次发送一个帧到一个单独的进程来对帧进行识别分析,并将文本发送回作为实时源的标题.显然,处理过程比简单地从网络摄像头抓取帧并显示它们需要更多的时间,因此如果标题是什么以及网络摄像头馈送显示的延迟,这是可接受的和预期的.
现在发生的事情是我正在显示的实时视频由于其他进程而滞后(当我不向计算进程发送帧时,没有滞后).我还确保一次只排队一帧,这样可以避免队列过载并导致延迟.我已经更新了下面的代码以反映这个细节.
我在python中使用多处理模块来帮助加快我的主程序.但是我相信我可能会做错误的事情,因为我认为计算并不是完全并行发生的.
我希望我的程序从主进程中的视频流中读取图像,并将帧传递给两个对其进行计算的子进程,并将文本发回(包含计算结果)到主进程.
然而,当我使用多处理时,主要过程似乎滞后,运行速度只有没有它的一半,导致我认为进程没有完全并行运行.
在做了一些研究之后,我推测滞后可能是由于使用队列在进程之间进行通信(将图像从main传递给子进程,以及将文本从child传回main).
然而,我评论了计算步骤,只是让主进程传递一个图像,并且子进程返回空白文本,在这种情况下,主进程根本没有减速.它全速奔跑.
因此,我相信
1)我没有最佳地使用多处理
要么
2)这些过程不能真正并行运行(我会理解有点滞后,但它会使主要过程减慢一半).
这是我的代码大纲.只有一个消费者而不是两个消费者,但两个消费者几乎完全相同.如果有人能提供指导,我将不胜感激.
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")
return
import cv2
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
Run Code Online (Sandbox Code Playgroud)
我希望我的程序在主进程中从视频流中读取图像
在生产者/消费者实现中,这就是您所拥有的,生产者,将任务放入队列以由消费者执行的内容,需要与主/控制过程分开,以便它可以与主要的并行添加任务进程从结果队列中读取输出.
请尝试以下方法.在消费者流程中添加了睡眠以模拟处理并添加了第二个消费者以显示它们并行运行.
如果处理无法跟上输入流,那么限制任务队列的大小以避免它因内存使用而耗尽也是一个好主意.可以在调用时指定大小Queue(<size>)
.如果队列处于该大小,则调用.put
将阻塞,直到队列未满.
import time
import multiprocessing
import cv2
class ImageProcessor(multiprocessing.Process):
def __init__(self, tasks_q, results_q):
multiprocessing.Process.__init__(self)
self.tasks_q = tasks_q
self.results_q = results_q
def run(self):
while True:
image = self.tasks_q.get()
# Do computations on image
time.sleep(1)
# Display the result on stream
self.results_q.put("text")
# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()
def capture_display_video(vc):
rval, frame = vc.read()
while rval:
image = frame.get_image()
if not tasks_q.full():
tasks_q.put(image)
if not results_q.empty():
text = results_q.get()
cv2.putText(frame, text)
cv2.imshow("preview", frame)
rval, frame = vc.read()
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
raise Exception("Cannot capture video")
capture_display_video(vc)
processor.terminate()
Run Code Online (Sandbox Code Playgroud)
Mar*_*ana -2
您可以尝试设置关联掩码以确保每个进程在不同的核心上运行。我在 Windows 7 上使用这个。
def setaffinity(mask = 128): # 128 is core 7
pid = win32api.GetCurrentProcessId()
handle = win32api.OpenProcess(win32con.PROCESS_ALL_ACCESS, True, pid)
win32process.SetProcessAffinityMask(handle, mask)
return
Run Code Online (Sandbox Code Playgroud)