1 python multithreading opencv tkinter
我最近创建了一个程序,使用 opencv 显示来自 2 个 ip 摄像机的多视频源。但我决定为我的应用程序创建 UI,现在,我不太清楚如何使用多线程方法来实现它。
这是我用来在 TKinter GUI 中仅显示一台摄像机的代码:
import tkinter
import cv2
import PIL.Image, PIL.ImageTk
import time
class App:
def __init__(self, window, window_title, video_source=0):
self.window = window
self.window.title(window_title)
self.video_source = video_source
# open video source (by default this will try to open the computer webcam)
self.vid = MyVideoCapture(self.video_source)
# Create a canvas that can fit the above video source size
self.canvas = tkinter.Canvas(window, width = self.vid.width, height = self.vid.height)
self.canvas.pack()
# Button that lets the user take a snapshot
self.btn_snapshot=tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot)
self.btn_snapshot.pack(anchor=tkinter.CENTER, expand=True)
# After it is called once, the update method will be automatically called every delay milliseconds
self.delay = 15
self.update()
self.window.mainloop()
def snapshot(self):
# Get a frame from the video source
ret, frame = self.vid.get_frame()
if ret:
cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
def update(self):
# Get a frame from the video source
ret, frame = self.vid.get_frame()
if ret:
self.photo = PIL.ImageTk.PhotoImage(image = PIL.Image.fromarray(frame))
self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW)
self.window.after(self.delay, self.update)
class MyVideoCapture:
def __init__(self, video_source=0):
# Open the video source
self.vid = cv2.VideoCapture(video_source)
if not self.vid.isOpened():
raise ValueError("Unable to open video source", video_source)
# Get video source width and height
self.width = self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)
self.height = self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)
def get_frame(self):
if self.vid.isOpened():
ret, frame = self.vid.read()
if ret:
# Return a boolean success flag and the current frame converted to BGR
return (ret, cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
else:
return (ret, None)
else:
return (ret, None)
# Release the video source when the object is destroyed
def __del__(self):
if self.vid.isOpened():
self.vid.release()
# Create a window and pass it to the Application object
App(tkinter.Tk(), "Tkinter and OpenCV")
Run Code Online (Sandbox Code Playgroud)
这是我之前的应用程序,它在不同线程中显示多个视频源:
from threading import Thread
import cv2
import time
class VideoWriterWidget(object):
def __init__(self, video_file_name, src=0):
# Create a VideoCapture object
self.frame_name = str(src)
self.video_file = video_file_name
self.video_file_name = video_file_name + '.avi'
self.capture = cv2.VideoCapture(src)
# Default resolutions of the frame are obtained (system dependent)
self.frame_width = int(self.capture.get(3))
self.frame_height = int(self.capture.get(4))
# Set up codec and output video settings
self.codec = cv2.VideoWriter_fourcc('M','J','P','G')
self.output_video = cv2.VideoWriter(self.video_file_name, self.codec, 30, (self.frame_width, self.frame_height))
# Start the thread to read frames from the video stream
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
# Start another thread to show/save frames
self.start_recording()
print('initialized {}'.format(self.video_file))
def update(self):
# Read the next frame from the stream in a different thread
while True:
if self.capture.isOpened():
(self.status, self.frame) = self.capture.read()
def show_frame(self):
# Display frames in main program
if self.status:
cv2.imshow(self.frame_name, self.frame)
# Press Q on keyboard to stop recording
key = cv2.waitKey(1)
if key == ord('q'):
self.capture.release()
self.output_video.release()
cv2.destroyAllWindows()
exit(1)
def save_frame(self):
# Save obtained frame into video output file
self.output_video.write(self.frame)
def start_recording(self):
# Create another thread to show/save frames
def start_recording_thread():
while True:
try:
self.show_frame()
self.save_frame()
except AttributeError:
pass
self.recording_thread = Thread(target=start_recording_thread, args=())
self.recording_thread.daemon = True
self.recording_thread.start()
if __name__ == '__main__':
src1 = 'Your link1'
video_writer_widget1 = VideoWriterWidget('Camera 1', src1)
src2 = 'Your link2'
video_writer_widget2 = VideoWriterWidget('Camera 2', src2)
src3 = 'Your link3'
video_writer_widget3 = VideoWriterWidget('Camera 3', src3)
# Since each video player is in its own thread, we need to keep the main thread alive.
# Keep spinning using time.sleep() so the background threads keep running
# Threads are set to daemon=True so they will automatically die
# when the main thread dies
while True:
time.sleep(5)
Run Code Online (Sandbox Code Playgroud)
有人可以帮助我如何使用带有线程的 tkinter 在我的新应用程序中使用我以前的代码(显示多摄像头)吗?
tkinter(像许多其他 GUI 一样)不喜欢在线程中使用小部件,因此首先我会尝试在没有线程的情况下在主进程中运行所有小部件。
在示例中,我将大部分代码移动到基于tkinter.Frame创建小部件的类,我可以在不同的流中多次使用它。因为我只有一台相机(并且系统不能多次使用同一台相机),所以我找到了一些外部流/文件来测试它。因为流发送非常大的图像,所以我将大小更改为400, 300
当不需要调整图像大小时,代码可以快速运行。
\n当它必须调整图像大小时,有时会出现问题,但仍然没问题。
import tkinter\nimport cv2\nimport PIL.Image, PIL.ImageTk\nimport time\n\n# widgets with canvas and camera\n\nclass tkCamera(tkinter.Frame):\n\n def __init__(self, window, video_source=0):\n super().__init__(window)\n \n self.window = window\n \n #self.window.title(window_title)\n self.video_source = video_source\n self.vid = MyVideoCapture(self.video_source)\n\n self.canvas = tkinter.Canvas(window, width=self.vid.width, height=self.vid.height)\n self.canvas.pack()\n \n # Button that lets the user take a snapshot\n self.btn_snapshot = tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot)\n self.btn_snapshot.pack(anchor=tkinter.CENTER, expand=True)\n \n # After it is called once, the update method will be automatically called every delay milliseconds\n self.delay = 15\n self.update_widget()\n\n def snapshot(self):\n # Get a frame from the video source\n ret, frame = self.vid.get_frame()\n \n if ret:\n cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))\n \n def update_widget(self):\n # Get a frame from the video source\n ret, frame = self.vid.get_frame()\n \n if ret:\n self.image = PIL.Image.fromarray(frame)\n self.photo = PIL.ImageTk.PhotoImage(image=self.image)\n self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW)\n \n self.window.after(self.delay, self.update_widget)\n\n\nclass App:\n\n def __init__(self, window, window_title, video_source1=0, video_source2=0):\n self.window = window\n\n self.window.title(window_title)\n \n # open video source (by default this will try to open the computer webcam)\n self.vid1 = tkCamera(window, video_source1)\n self.vid1.pack()\n \n self.vid2 = tkCamera(window, video_source2)\n self.vid2.pack()\n \n # Create a canvas that can fit the above video source size\n \n self.window.mainloop()\n \n \n \nclass MyVideoCapture:\n def __init__(self, video_source=0):\n # Open the video source\n self.vid = cv2.VideoCapture(video_source)\n if not self.vid.isOpened():\n raise ValueError("Unable to open video source", video_source)\n \n # Get video source width and height\n self.width = self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)\n self.height = self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)\n \n self.width = 400\n self.height = 300\n \n def get_frame(self):\n if self.vid.isOpened():\n ret, frame = self.vid.read()\n if ret:\n frame = cv2.resize(frame, (400, 300))\n # Return a boolean success flag and the current frame converted to BGR\n return (ret, cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))\n else:\n return (ret, None)\n else:\n return (ret, None)\n \n # Release the video source when the object is destroyed\n def __del__(self):\n if self.vid.isOpened():\n self.vid.release()\n \n# Create a window and pass it to the Application object\nApp(tkinter.Tk(), "Tkinter and OpenCV", 0, \'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4\')\nRun Code Online (Sandbox Code Playgroud)\n如果您打算处理框架 - 即。检测运动或面部 - 然后代码get_frame可以在单独的线程中运行。线程将始终处理帧并分配给当前的self.frame并且get_frame()应该只返回当前的self.frame。
请参阅博客中的类似想法pyImageSearch\n使用 Python 和 OpenCV 提高网络摄像头 FPS。
也许你甚至可以使用
\n from imutils.video import WebcamVideoStream\nRun Code Online (Sandbox Code Playgroud)\n编辑:
\n版本仍然没有线程,但带有源列表,因此它可以显示许多摄像机。但对于超过 2 个源,显示时会出现问题 - 因此需要使用threads.
顺便说一句:小部件和窗口tkinter已经有方法update(),所以我将其重命名为update_frame()
在snapshot我使用中pilow.image.save(),我不必读取新帧并转换为BGR- 并且我可以在流停止时拍摄快照。Button仅停止替换画布上的图像,但不停止从线程中的流中读取帧 - 因此其他函数仍然可以处理或记录流。
import tkinter\nimport cv2\nimport PIL.Image, PIL.ImageTk\nimport time\n\nclass MyVideoCapture:\n\n def __init__(self, video_source=0, width=None, height=None):\n \n # Open the video source\n self.vid = cv2.VideoCapture(video_source)\n if not self.vid.isOpened():\n raise ValueError("Unable to open video source", video_source)\n\n self.width = width\n self.height = height\n \n # Get video source width and height\n if not self.width:\n self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int\n if not self.height:\n self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int\n\n self.ret = False\n self.frame = None\n\n def process(self):\n ret = False\n frame = None\n \n if self.vid.isOpened():\n ret, frame = self.vid.read()\n if ret:\n frame = cv2.resize(frame, (self.width, self.height))\n frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)\n \n self.ret = ret\n self.frame = frame\n \n def get_frame(self):\n self.process() # later run in thread\n return self.ret, self.frame\n \n # Release the video source when the object is destroyed\n def __del__(self):\n if self.vid.isOpened():\n self.vid.release()\n \n \nclass tkCamera(tkinter.Frame):\n\n def __init__(self, window, video_source=0, width=None, height=None):\n super().__init__(window)\n \n self.window = window\n \n #self.window.title(window_title)\n self.video_source = video_source\n self.vid = MyVideoCapture(self.video_source, width, height)\n\n self.canvas = tkinter.Canvas(window, width=self.vid.width, height=self.vid.height)\n self.canvas.pack()\n \n # Button that lets the user take a snapshot\n self.btn_snapshot = tkinter.Button(window, text="Snapshot", width=50, command=self.snapshot)\n self.btn_snapshot.pack(anchor=\'center\', expand=True)\n \n # After it is called once, the update method will be automatically called every delay milliseconds\n self.delay = 15\n self.update_widget()\n\n def snapshot(self):\n # Get a frame from the video source\n ret, frame = self.vid.get_frame()\n \n if ret:\n cv2.imwrite("frame-" + time.strftime("%d-%m-%Y-%H-%M-%S") + ".jpg", cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))\n \n def update_widget(self):\n # Get a frame from the video source\n ret, frame = self.vid.get_frame()\n \n if ret:\n self.image = PIL.Image.fromarray(frame)\n self.photo = PIL.ImageTk.PhotoImage(image=self.image)\n self.canvas.create_image(0, 0, image = self.photo, anchor = tkinter.NW)\n \n self.window.after(self.delay, self.update_widget)\n\n\nclass App:\n\n def __init__(self, window, window_title, video_sources):\n self.window = window\n\n self.window.title(window_title)\n \n self.vids = []\n \n for source in video_sources:\n vid = tkCamera(window, source, 400, 300)\n vid.pack()\n self.vids.append(vid)\n \n # Create a canvas that can fit the above video source size\n \n self.window.mainloop()\n \nif __name__ == \'__main__\': \n\n sources = [\n 0, \n #\'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4\',\n #\'https://imageserver.webcamera.pl/rec/skolnity/latest.mp4\',\n \'https://imageserver.webcamera.pl/rec/krakow4/latest.mp4\',\n ]\n \n \n # Create a window and pass it to the Application object\n App(tkinter.Tk(), "Tkinter and OpenCV", sources)\nRun Code Online (Sandbox Code Playgroud)\n编辑
\nthreads用于读取和处理帧的版本。我time(1/fps)仅在需要时才添加处理它,以便工作更顺畅。由于延误,15有时会冻结。
我使用的源只有 24 秒,所以几秒钟后它们就会停止。
\n\nimport tkinter\nimport cv2\nimport PIL.Image, PIL.ImageTk\nimport time\nimport threading\n\nclass MyVideoCapture:\n\n def __init__(self, video_source=0, width=None, height=None, fps=None):\n \n self.video_source = video_source\n self.width = width\n self.height = height\n self.fps = fps\n \n # Open the video source\n self.vid = cv2.VideoCapture(video_source)\n if not self.vid.isOpened():\n raise ValueError("[MyVideoCapture] Unable to open video source", video_source)\n\n # Get video source width and height\n if not self.width:\n self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int\n if not self.height:\n self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int\n if not self.fps:\n self.fps = int(self.vid.get(cv2.CAP_PROP_FPS)) # convert float to int\n\n # default value at start \n self.ret = False\n self.frame = None\n\n # start thread\n self.running = True\n self.thread = threading.Thread(target=self.process)\n self.thread.start()\n \n def process(self):\n while self.running:\n ret, frame = self.vid.read()\n \n if ret:\n # process image\n frame = cv2.resize(frame, (self.width, self.height))\n frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)\n else:\n print(\'[MyVideoCapture] stream end:\', self.video_source)\n # TODO: reopen stream\n self.running = False\n break\n \n # assign new frame\n self.ret = ret\n self.frame = frame\n \n # sleep for next frame\n time.sleep(1/self.fps)\n \n def get_frame(self):\n return self.ret, self.frame\n \n # Release the video source when the object is destroyed\n def __del__(self):\n # stop thread\n if self.running:\n self.running = False\n self.thread.join()\n\n # relase stream\n if self.vid.isOpened():\n self.vid.release()\n \n \nclass tkCamera(tkinter.Frame):\n\n def __init__(self, window, text="", video_source=0, width=None, height=None):\n super().__init__(window)\n \n self.window = window\n \n #self.window.title(window_title)\n self.video_source = video_source\n self.vid = MyVideoCapture(self.video_source, width, height)\n\n self.label = tkinter.Label(self, text=text)\n self.label.pack()\n \n self.canvas = tkinter.Canvas(self, width=self.vid.width, height=self.vid.height)\n self.canvas.pack()\n\n # Button that lets the user take a snapshot\n self.btn_snapshot = tkinter.Button(self, text="Start", command=self.start)\n self.btn_snapshot.pack(anchor=\'center\', side=\'left\')\n \n self.btn_snapshot = tkinter.Button(self, text="Stop", command=self.stop)\n self.btn_snapshot.pack(anchor=\'center\', side=\'left\')\n \n # Button that lets the user take a snapshot\n self.btn_snapshot = tkinter.Button(self, text="Snapshot", command=self.snapshot)\n self.btn_snapshot.pack(anchor=\'center\', side=\'left\')\n \n # After it is called once, the update method will be automatically called every delay milliseconds\n # calculate delay using `FPS`\n self.delay = int(1000/self.vid.fps)\n\n print(\'[tkCamera] source:\', self.video_source)\n print(\'[tkCamera] fps:\', self.vid.fps, \'delay:\', self.delay)\n \n self.image = None\n \n self.running = True\n self.update_frame()\n\n def start(self):\n if not self.running:\n self.running = True\n self.update_frame()\n\n def stop(self):\n if self.running:\n self.running = False\n \n def snapshot(self):\n # Get a frame from the video source\n #ret, frame = self.vid.get_frame()\n #if ret:\n # cv2.imwrite(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg"), cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR))\n \n # Save current frame in widget - not get new one from camera - so it can save correct image when it stoped\n if self.image:\n self.image.save(time.strftime("frame-%d-%m-%Y-%H-%M-%S.jpg"))\n \n def update_frame(self):\n # widgets in tkinter already have method `update()` so I have to use different name -\n\n # Get a frame from the video source\n ret, frame = self.vid.get_frame()\n \n if ret:\n self.image = PIL.Image.fromarray(frame)\n self.photo = PIL.ImageTk.PhotoImage(image=self.image)\n self.canvas.create_image(0, 0, image=self.photo, anchor=\'nw\')\n \n if self.running:\n self.window.after(self.delay, self.update_frame)\n\n\nclass App:\n\n def __init__(self, window, window_title, video_sources):\n self.window = window\n\n self.window.title(window_title)\n \n self.vids = []\n\n columns = 2\n for number, source in enumerate(video_sources):\n text, stream = source\n vid = tkCamera(self.window, text, stream, 400, 300)\n x = number % columns\n y = number // columns\n vid.grid(row=y, column=x)\n self.vids.append(vid)\n \n self.window.protocol("WM_DELETE_WINDOW", self.on_closing)\n self.window.mainloop()\n \n def on_closing(self, event=None):\n print(\'[App] stoping threads\')\n for source in self.vids:\n source.vid.running = False\n print(\'[App] exit\')\n self.window.destroy()\n\nif __name__ == \'__main__\': \n\n sources = [\n (\'me\', 0), \n (\'Zakopane, Poland\', \'https://imageserver.webcamera.pl/rec/krupowki-srodek/latest.mp4\'),\n (\'Krak\xc3\xb3w, Poland\', \'https://imageserver.webcamera.pl/rec/krakow4/latest.mp4\'),\n (\'Warszawa, Poland\', \'https://imageserver.webcamera.pl/rec/warszawa/latest.mp4\'),\n #(\'Baltic See, Poland\', \'https://imageserver.webcamera.pl/rec/chlopy/latest.mp4\'),\n #(\'Mountains, Poland\', \'https://imageserver.webcamera.pl/rec/skolnity/latest.mp4\'),\n ]\n \n # Create a window and pass it to the Application object\n App(tkinter.Tk(), "Tkinter and OpenCV", sources)\nRun Code Online (Sandbox Code Playgroud)\n编辑
\n可以录制视频的版本。
\ncv2需要具有 BGR 颜色的帧才能正确保存它,因此我必须在帧转换为 RGB 之前保存它。
我将大部分代码移至 ,MyVideoCapture因此即使没有tkinter. 我还添加了选项MyVideoCapture来获取图像为cv2 arrayor pillow.image- 所以现在它转换为pillow内部thread,所以主线程不必这样做。
import tkinter\nimport cv2\nimport PIL.Image, PIL.ImageTk\nimport time\nimport threading\n\nclass MyVideoCapture:\n\n def __init__(self, video_source=0, width=None, height=None, fps=None):\n \n self.video_source = video_source\n self.width = width\n self.height = height\n self.fps = fps\n \n # Open the video source\n self.vid = cv2.VideoCapture(video_source)\n if not self.vid.isOpened():\n raise ValueError("[MyVideoCapture] Unable to open video source", video_source)\n\n # Get video source width and height\n if not self.width:\n self.width = int(self.vid.get(cv2.CAP_PROP_FRAME_WIDTH)) # convert float to int\n if not self.height:\n self.height = int(self.vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert float to int\n if not self.fps:\n self.fps = int(self.vid.get(cv2.CAP_PROP_FPS)) # convert float to int\n\n # default value at start \n self.ret = False\n self.frame = None\n \n self.convert_color = cv2.COLOR_BGR2RGB\n #self.convert_color = cv2.COLOR_BGR2GRAY\n self.convert_pillow = True\n \n # default values for recording \n self.recording = False\n self.recording_filename = \'output.mp4\'\n self.recording_writer = None\n \n # start thread\n self.running = True\n self.thread = threading.Thread(target=self.process)\n self.thread.start()\n \n def start_recording(self, filename=None):\n if self.recording:\n print(\'[MyVideoCapture] already recording:\', self.recording_filename)\n else:\n # VideoWriter constructors\n #.mp4 = codec id 2\n if filename:\n self.recording_filename = filename\n else:\n self.recording_filename = time.strftime("%Y.%m.%d %H.%M.%S", time.localtime()) + ".avi"\n #fourcc = cv2.VideoWriter_fourcc(*\'I420\') # .avi\n #fourcc = cv2.VideoWriter_fourcc(*\'MP4V\') # .avi\n fourcc = cv2.VideoWriter_fourcc(*\'MP42\') # .avi\n #fourcc = cv2.VideoWriter_fourcc(*\'AVC1\') # error libx264\n #fourcc = cv2.VideoWriter_fourcc(*\'H264\') # error libx264\n #fourcc = cv2.VideoWriter_fourcc(*\'WRAW\') # error --- no information ---\n #fourcc = cv2.VideoWriter_fourcc(*\'MPEG\') # .avi 30fps\n #fourcc = cv2.VideoWriter_fourcc(*\'MJPG\') # .avi\n #fourcc = cv2.VideoWriter_fourcc(*\'XVID\') # .avi\n #fourcc = cv2.VideoWriter_fourcc(*\'H265\') # error \n self.recording_writer = cv2.VideoWriter(self.recording_filename, fourcc, self.fps, (self.width, self.height))\n self.recording = True\n print(\'[MyVideoCapture] started recording:\', self.recording_filename)\n \n def stop_recording(self):\n if not self.recording:\n print(\'[MyVideoCapture] not recording\')\n else:\n self.recording = False\n self.recording_writer.release() \n print(\'[MyVideoCapture] stop recording:\', self.recording_filename)\n \n def record(self, frame):\n # write frame to file \n if self.recording_writer and self.recording_writer.isOpened():\n self.recording_writer.write(frame)\n \n \n def process(self):\n while self.running:\n ret, frame = self.vid.read()\n \n if ret:\n # process image\n frame = cv2.resize(frame, (self.width, self.height))\n\n # it has to record before converting colors\n if self.recording:\n self.record(frame)\n \n if self.c
| 归档时间: |
|
| 查看次数: |
5743 次 |
| 最近记录: |