kra*_*r65 15 python macos opencv multiprocessing pathos
我正在OSX上读一个网络摄像头,这个简单的脚本可以正常工作:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Run Code Online (Sandbox Code Playgroud)
我现在想要在一个单独的进程中阅读视频,我的脚本更长,并且在Linux上的单独进程中正确地读取视频:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
这在Linux上工作正常,但在OSX上我遇到了麻烦,因为它似乎无法.read()对创建的cv2.VideoCapture(device)对象(存储在var中self._cap)执行操作.
经过一番搜索后,我发现了这个SO答案,建议使用Billiard,它是pythons多处理的替代品,据说它有一些非常有用的改进.所以在文件的顶部我只是在我之前的多处理导入(有效覆盖multiprocessing.Process)之后添加了导入:
from billiard import Process, forking_enable
Run Code Online (Sandbox Code Playgroud)
并且在video_process变量实例化之前我使用forking_enable如下:
forking_enable(0) # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))
Run Code Online (Sandbox Code Playgroud)
所以在这个版本(这里是pastebin)我再次运行该文件,这给了我这个错误:
pickle.PicklingError:不能发痒:它找不到主 .stream_function
搜索到这个错误导致我提出了一个问题,其中有一长串答案,其中一个给了我建议使用dill序列化lib来解决这个问题.但是,该lib应该与Pathos多处理分支一起使用.所以我只是尝试更改我的多处理导入行
from multiprocessing import Array, Value, Process
Run Code Online (Sandbox Code Playgroud)
至
from pathos.multiprocessing import Array, Value, Process
Run Code Online (Sandbox Code Playgroud)
但是没有Array,Value并且Process似乎存在于pathos.multiprocessing包中.
从这一刻起,我完全迷失了.我正在寻找我几乎没有足够知识的东西,我甚至不知道我需要在哪个方向上搜索或调试.
那么,比我更聪明的灵魂能帮助我在一个单独的过程中捕捉视频吗?欢迎所有提示!
主要挑战multiprocessing是理解内存地址空间分离的情况下的内存模型。
Python 让事情变得更加混乱,因为它抽象了其中的许多方面,在一些看似无辜的 API 下隐藏了多种机制。
当你写这个逻辑时:
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
...
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
Run Code Online (Sandbox Code Playgroud)
您传递给的Process stream_functionwhich 指的是VideoCapture类( self.get_size) 的内部组件,但更重要的是,which 不能作为顶级函数使用。
子进程将无法重新构造所需的对象,因为它收到的只是函数的名称。它尝试在主模块中查找它,因此显示消息:
pickle.PicklingError:无法pickle:找不到main.stream_function
子进程尝试解析主模块中的函数,但main.stream_function查找失败。
我的第一个建议是更改您的逻辑,以便将返回的方法传递给子进程stream_function。
video_process = Process(target=cap.get_stream_function, args=(...))
Run Code Online (Sandbox Code Playgroud)
然而,当您在两个进程之间共享状态时,您可能仍然会遇到问题。
当人们接触 Python 中的多处理范例时,我通常建议他们将进程视为在单独的机器上运行。在这些情况下,很明显您的架构存在问题。
我建议您将两个进程的职责分开,确保一个进程(子进程)负责处理视频的整个捕获,另一个进程(父进程或第三个进程)负责处理帧。
这种范例被称为生产者和消费者问题,它非常适合您的系统。视频捕获过程将是生产者,另一个过程是消费者。一个简单的multiprocessing.Pipeormultiprocessing.Queue可以确保帧一旦准备好就从生产者转移到消费者。
添加伪代码示例,因为我不知道视频捕获 API。重点是处理生产者进程中的整个视频捕获逻辑,将其从消费者中抽象出来。消费者唯一需要知道的是它通过管道接收框架对象。
def capture_video(writer):
"""This runs in the producer process."""
# The VideoCapture class wraps the video acquisition logic
cap = VideoCapture()
while True:
frame = cap.get_next_frame() # the method returns the next frame
writer.send(frame) # send the new frame to the consumer process
def main():
reader, writer = multiprocessing.Pipe(False)
# producer process
video_process = Process(target=capture_video, args=[writer])
video_process.start() # Launch capture process
while True:
try:
frame = reader.recv() # receive next frame from the producer
process_frame(frame)
except KeyboardInterrupt:
video_process.terminate()
break
Run Code Online (Sandbox Code Playgroud)
请注意进程之间没有共享状态(不需要共享任何数组)。通信通过管道并且是单向的,使得逻辑非常简单。正如我上面所说,这个逻辑也适用于不同的机器。您只需要用插座替换管道即可。
您可能需要一种更干净的生产者进程终止方法。我建议您使用multiprocessing.Event. 只需从块中的父级设置它KeyboardInterrupt,并在每次迭代时检查其在子级中的状态 ( while not event.is_set())。
您的第一个问题是您无法在某个forked过程中访问网络摄像头.当使用外部库时会出现几个问题,fork因为fork操作不会清除父进程打开的所有文件描述符,从而导致奇怪的行为.对于Linux上的这类问题,库通常更加健壮,但是共享IO对象(如cv2.VideoCapture2进程之间)并不是一个好主意.
当你使用billard.forking_enabled并设置它时False,你要求库不要用来fork产生新的进程但是spawn或者forkserver方法,它们更干净,因为它们关闭所有文件描述符但启动速度也慢,这在你的情况下应该不是问题.如果您正在使用python3.4+,您可以使用multiprocessing.set_start_method('forkserver')例如.
当您使用这些方法之一时,需要将目标函数和参数序列化以传递给子进程.序列化默认使用pickle,它有几个流,就像你提到的那样,不能序列化本地定义的对象cv2.VideoCapture.但是你可以简化你的程序,为你的Processpicklelisable 做出所有的论据.这是解决问题的一个尝试:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
set_start_method("spawn")
main()
Run Code Online (Sandbox Code Playgroud)
我目前无法在Mac上测试它,所以它可能不会开箱即用,但不应该有multiprocessing相关的错误.一些说明:
cv2.VideoCapture新孩子中的对象并抓住相机,因为只有一个进程应该从相机读取.fork只是由于共享cv2.VideoCapture并在stream函数中重新创建它将解决您的问题.mp.Array缓冲区(这真的很奇怪,我需要一段时间来弄明白).您需要显式传递Array并重新创建包装器.也许你的第一个程序中的问题fork只是由于共享cv2.VideoCapture并在stream函数中重新创建它将解决您的问题.
我假设你正在运行你的代码python3.4+所以我没有使用billard但是使用forking_enabled(False)而不是set_start_method应该有点类似.
如果这项工作让我知道!
| 归档时间: |
|
| 查看次数: |
1080 次 |
| 最近记录: |