kra*_*r65 3 python video streaming opencv zeromq
我有一个简单的网络摄像头,我使用OpenCV读取了该摄像头,现在我正尝试使用ZeroMQ将视频录像发送到其他(Python)程序。因此,我有以下简单的脚本来读取网络摄像头并使用ZeroMQ套接字发送它:
import cv2
import os
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
# init the camera
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
footage_socket.send_string(base64.b64encode(frame))
# Show the video in a window
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Run Code Online (Sandbox Code Playgroud)
这样效果很好,因为它可以显示视频并且不会出现任何错误。
我注释掉了显示图像(cv2.imshow()
和cv2.waitKey(1)
)的两行。然后,我以并行方式启动以下脚本。第二个脚本应该接收视频片段并显示它。
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
# camera = cv2.VideoCapture("output.avi")
while True:
try:
frame = footage_socket.recv_string()
frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms
# # before going to the next frame
except KeyboardInterrupt:
cv2.destroyAllWindows()
break
print "\n\nBye bye\n"
Run Code Online (Sandbox Code Playgroud)
不幸的是,这冻结了cv2.waitKey(1)
。
有人知道我在做什么错吗?我需要对素材进行不同的解码吗?欢迎所有提示!
鉴于目标很明确,分布式应用程序基础设施的快速原型设计取决于几个风险点。
0) OpenCVcv2
模块大量使用基于 C 的底层组件,如果尝试使用cv2.imshow()
工具和cv2
(外部 FSA)窗口管理和框架显示服务,python 美化经常挂起。
1) ZeroMQ 框架可以帮助您更多,而不仅仅是尝试强制将数据强制转换(string)
为使用.send_string() / .recv_string()
- 在这里可能会变得更好,而不是移动已知的图像( pxW * pxH )
几何形状 * RGB 深度在一些更智能的 BLOB 映射对象中(将谈到这方面在下面的架构展望中更多一点)。
2)给定 0 和 1 点,ZeroMQ 基础设施(原型设计中的更多)应该对未处理的异常变得健壮(这会使zmq.Context()
实例及其相关的.Socket()
子项在cv2.imshow()
崩溃的情况下独自挂在分配的资源上,因为它经常在快速原型循环。
因此,try: except: finally:
处理程序内部代码的彻底和自律框架和.setsockopt( zmq.LINGER, 0 )
在套接字实例化后立即显式初始+ 最终.close()
+处理程序部分context.term()
内部finally:
是公平的。
SEQ
纯int
-s 来验证 ZeroMQ 部分最好的第一级问题隔离是设置流程,只是为了提供不受控制SEQ
的整数,.send( )
从PUB
侧面广播。
... # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK ) # PUB.send( SEQ ) -> *SUB*
...
Run Code Online (Sandbox Code Playgroud)
除非您的接收方证明它对 int-s 流的处理能力很强,否则继续进行下去是没有意义的。
...
aMsgIN = footage_socket.recv( zmq.NOBLOCK ) # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...) # backthrottle the loop a bit
...
Run Code Online (Sandbox Code Playgroud)
.imshow()
从.recv()
数据和事件循环中解耦如果您的数据泵按需要工作,远程显示器就可以作为下一个目标。
ZeroMQ 要么提供完整的消息(一个 BLOB),要么什么都不提供。这是事实。其次,ZeroMQ 不保证任何人都可以无错误地交付。这些都是事实,您的设计必须接受。
采集是更简单的部分,只需获取数据(也许一些色彩空间转换可能会在这里集中进行,但除此之外,任务(对于低于 4K / 低于 30fps 的图像处理)通常在这一侧没有错误。
frame
,如上所述,是一个numpy.ndarray
实例。发送硬编码的二进制映射 BLOB 时将获得最佳性能,无需任何“智能”转换,很明显,这frame
只是一大堆位(尽管零复制机制可能有一些更高级的细节) ZeroMQ,但这些在这个阶段对发送方没有直接好处)。
# struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )
Run Code Online (Sandbox Code Playgroud)
更难的部分在接收端。如果尝试使用cv2
隐藏在 中的内置事件循环引擎,则.imshow()
此循环将与通过.recv()
从PUB
侧面发布的读取更新流的外部逻辑发生冲突。
作为一种合理的折衷方案,可以忽略所有“延迟” frame
-s,它们没有使进程与PUB
-side 获取/广播节奏同步,而只显示最近的一个……使用zmq.CONFLATE
ZeroMQ 传输基础设施上的选项(如果重建事件流的目的只是视觉感知,则延迟的“旧”图片已经失去意义,相反,如果目的是 1:1 记录完整的采集,zmq.CONFLATE
则将丢弃frame
实例,应该得到处理,因此应该为这样的 1:1 文档目的添加一些其他架构,最好与数据/处理流的“只是视觉”分支分开)。
完成此操作后,.recv()
(可能是Poller()
+.recv()
循环的组合)将为SUB
一侧提供适当的数据泵,该数据泵独立于cv2
工具,并且是.imshow()
(隐藏的)-FSA 事件循环。
对于更高的 fps + FullHD / 2K / 4K 项目cv2
,使用 zmq.Stopwatch() 实例的{ .start(), .stop() }
方法系统地分析累积处理延迟/处理延迟。
拥有硬数据,您可能会及时发现,何时出现额外的需求来解决一些更难的实时约束,因此请考虑:
主要避免陷入任何无法控制的 python 垃圾收集黑洞的风险——始终控制 { gc.disable() | gc.enable();gc.collect() }
关键路径部分,并gc.collect()
在您的设计知道可行的情况下启动一个明确的。
避免新的内存分配延迟——可以预先分配所有必要的 numpy
数组,然后只是强制numpy
使用数据修改的就地模式,从而避免任何进一步的临时内存管理相关的等待状态
通过对整个大/(彩色)深图像的部分(条纹)进行单独的、多流的、独立的更新(记住零保修 - 获得完整的“胖”消息或None
)
调准的.Context()
使用的表现zmq.AFFINITY
于I映射不同的类/ O流量优先级到隔离zmq.Context( N )
I / O线程。
如果预期有多个订阅者且未使用,则在 PUB 端进行微调zmq.SNDBUF
+ 。zmq.SNDHWM
zmq.CONFLATE
最后但并非最不重要的一点是,可以利用numba.jit()
LLVM 预编译的可重用代码加速用于关键路径功能(通常是繁重的numpy
处理),其中额外的微秒被削减,对您的视频处理管道带来最有益的影响,而仍然保持在纯 python 的舒适中(当然,仍然有一些cv2
警告)。
cv2
在原型设计阶段的更多提示和技巧:可能喜欢这个用于cv2
基于图像处理。
对于方法的简单 GUI 交互式参数调整,可能会喜欢这个cv2
。
可能会喜欢这对cv2
处理管线分析与zmq.Stopwatch()
细节到[usec]
最后,我通过采取中间步骤解决了这个问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这使我的事实,我需要编码的帧图像(我选择了JPG格式),并用魔术方法cv2.imencode('.jpg', frame)
和cv2.imdecode(npimg, 1)
我可以使它工作。我在下面粘贴了完整的工作代码。
第一个脚本读取网络摄像头,并通过Zeromq套接字发送素材:
import cv2
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
camera = cv2.VideoCapture(0) # init the camera
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
encoded, buffer = cv2.imencode('.jpg', frame)
footage_socket.send_string(base64.b64encode(buffer))
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Run Code Online (Sandbox Code Playgroud)
第二个脚本接收帧图像并显示它们:
import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
while True:
try:
frame = footage_socket.recv_string()
img = base64.b64decode(frame)
npimg = np.fromstring(img, dtype=np.uint8)
source = cv2.imdecode(npimg, 1)
cv2.imshow("image", source)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Run Code Online (Sandbox Code Playgroud)
无论如何,祝你美好的一天!