如何通过ZeroMQ套接字发送OpenCV视频素材?

kra*_*r65 3 python video streaming opencv zeromq

我有一个简单的网络摄像头,我使用OpenCV读取了该摄像头,现在我正尝试使用ZeroMQ将视频录像发送到其他(Python)程序。因此,我有以下简单的脚本来读取网络摄像头并使用ZeroMQ套接字发送它:

import cv2
import os
import zmq
import base64

context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')

# init the camera
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()            # grab the current frame
        frame = cv2.resize(frame, (640, 480))       # resize the frame
        footage_socket.send_string(base64.b64encode(frame))

        # Show the video in a window
        cv2.imshow("Frame", frame)                  # show the frame to our screen
        cv2.waitKey(1)                              # Display it at least one ms
        #                                           # before going to the next frame

    except KeyboardInterrupt:
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break
Run Code Online (Sandbox Code Playgroud)

这样效果很好,因为它可以显示视频并且不会出现任何错误。

我注释掉了显示图像(cv2.imshow()cv2.waitKey(1))的两行。然后,我以并行方式启动以下脚本。第二个脚本应该接收视频片段并显示它。

import cv2
import zmq
import base64
import numpy as np

context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))

# camera = cv2.VideoCapture("output.avi")

while True:
    try:
        frame = footage_socket.recv_string()
        frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
        cv2.imshow("Frame", frame)                  # show the frame to our screen
        cv2.waitKey(1)                              # Display it at least one ms
        #                                           # before going to the next frame
    except KeyboardInterrupt:
        cv2.destroyAllWindows()
        break

print "\n\nBye bye\n"
Run Code Online (Sandbox Code Playgroud)

不幸的是,这冻结了cv2.waitKey(1)

有人知道我在做什么错吗?我需要对素材进行不同的解码吗?欢迎所有提示!

use*_*197 5

步骤 0:风险盘点

鉴于目标很明确,分布式应用程序基础设施的快速原型设计取决于几个风险点。

0) OpenCVcv2模块大量使用基于 C 的底层组件,如果尝试使用cv2.imshow()工具和cv2(外部 FSA)窗口管理和框架显示服务,python 美化经常挂起。

1) ZeroMQ 框架可以帮助您更多,而不仅仅是尝试强制将数据强制转换(string)为使用.send_string() / .recv_string()- 在这里可能会变得更好,而不是移动已知的图像( pxW * pxH )几何形状 * RGB 深度在一些更智能的 BLOB 映射对象中(将谈到这方面在下面的架构展望中更多一点)。

2)给定 0 和 1 点,ZeroMQ 基础设施(原型设计中的更多)应该对未处理的异常变得健壮(这会使zmq.Context()实例及其相关的.Socket()子项在cv2.imshow()崩溃的情况下独自挂在分配的资源上,因为它经常在快速原型循环。

因此,try: except: finally:处理程序内部代码的彻底和自律框架和.setsockopt( zmq.LINGER, 0 )在套接字实例化后立即显式初始+ 最终.close()+处理程序部分context.term()内部finally:是公平的。


步骤 1:通过发送一个SEQint-s 来验证 ZeroMQ 部分

最好的第一级问题隔离是设置流程,只是为了提供不受控制SEQ的整数,.send( )PUB侧面广播。

...                                                   # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK )               # PUB.send( SEQ ) -> *SUB*
...
Run Code Online (Sandbox Code Playgroud)

除非您的接收方证明它对 int-s 流的处理能力很强,否则继续进行下去是没有意义的。

...
aMsgIN = footage_socket.recv( zmq.NOBLOCK )           # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...)                                          # backthrottle the loop a bit
...
Run Code Online (Sandbox Code Playgroud)

第 2 步:.imshow().recv()数据和事件循环中解耦

如果您的数据泵按需要工作,远程显示器就可以作为下一个目标。

ZeroMQ 要么提供完整的消息(一个 BLOB),要么什么都不提供。这是事实。其次,ZeroMQ 不保证任何人都可以无错误地交付。这些都是事实,您的设计必须接受。

采集是更简单的部分,只需获取数据(也许一些色彩空间转换可能会在这里集中进行,但除此之外,任务(对于低于 4K / 低于 30fps 的图像处理)通常在这一侧没有错误。

frame,如上所述,是一个numpy.ndarray实例。发送硬编码的二进制映射 BLOB 时将获得最佳性能,无需任何“智能”转换,很明显,这frame只是一大堆位(尽管零复制机制可能有一些更高级的细节) ZeroMQ,但这些在这个阶段对发送方没有直接好处)。

#                               struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )
Run Code Online (Sandbox Code Playgroud)

更难的部分在接收端。如果尝试使用cv2隐藏在 中的内置事件循环引擎,则.imshow()此循环将与通过.recv()PUB侧面发布的读取更新流的外部逻辑发生冲突。

作为一种合理的折衷方案,可以忽略所有“延迟” frame-s,它们没有使进程与PUB-side 获取/广播节奏同步,而只显示最近的一个……使用zmq.CONFLATEZeroMQ 传输基础设施上的选项(如果重建事件流的目的只是视觉感知,则延迟的“旧”图片已经失去意义,相反,如果目的是 1:1 记录完整的采集,zmq.CONFLATE则将丢弃frame实例,应该得到处理,因此应该为这样的 1:1 文档目的添加一些其他架构,最好与数据/处理流的“只是视觉”分支分开)。

完成此操作后,.recv()(可能是Poller()+.recv()循环的组合)将为SUB一侧提供适当的数据泵,该数据泵独立于cv2工具,并且是.imshow()(隐藏的)-FSA 事件循环。


架构和性能提示:

  • 对于更高的 fps + FullHD / 2K / 4K 项目cv2,使用 zmq.Stopwatch() 实例的{ .start(), .stop() }方法系统地分析累积处理延迟/处理延迟。

  • 拥有硬数据,您可能会及时发现,何时出现额外的需求来解决一些更难的实时约束,因此请考虑:

  • 主要避免陷入任何无法控制的 python 垃圾收集黑洞的风险——始终控制 { gc.disable() | gc.enable();gc.collect() }关键路径部分,并gc.collect()在您的设计知道可行的情况下启动一个明确的。

  • 避免新的内存分配延迟——可以预先分配所有必要的 numpy数组,然后只是强制numpy使用数据修改的就地模式,从而避免任何进一步的临时内存管理相关的等待状态

  • 通过对整个大/(彩色)深图像的部分(条纹)进行单独的、多流的、独立的更新(记住零保修 - 获得完整的“胖”消息或None)

  • 调准的.Context()使用的表现zmq.AFFINITY于I映射不同的类/ O流量优先级到隔离zmq.Context( N )I / O线程。

  • 如果预期有多个订阅者且未使用,则在 PUB 端进行微调zmq.SNDBUF+ 。zmq.SNDHWMzmq.CONFLATE

  • 最后但并非最不重要的一点是,可以利用numba.jit()LLVM 预编译的可重用代码加速用于关键路径功能(通常是繁重的numpy处理),其中额外的微秒被削减,对您的视频处理管道带来最有益的影响,而仍然保持在纯 python 的舒适中(当然,仍然有一些cv2警告)。


cv2在原型设计阶段的更多提示和技巧:

可能喜欢这个用于cv2基于图像处理。

对于方法的简单 GUI 交互式参数调整,可能会喜欢这个cv2

可能会喜欢cv2处理管线分析与zmq.Stopwatch()细节到[usec]


kra*_*r65 5

最后,我通过采取中间步骤解决了这个问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这使我的事实,我需要编码的帧图像(我选择了JPG格式),并用魔术方法cv2.imencode('.jpg', frame)cv2.imdecode(npimg, 1)我可以使它工作。我在下面粘贴了完整的工作代码。

第一个脚本读取网络摄像头,并通过Zeromq套接字发送素材:

import cv2
import zmq
import base64

context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')

camera = cv2.VideoCapture(0)  # init the camera

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        encoded, buffer = cv2.imencode('.jpg', frame)
        footage_socket.send_string(base64.b64encode(buffer))

    except KeyboardInterrupt:
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break
Run Code Online (Sandbox Code Playgroud)

第二个脚本接收帧图像并显示它们:

import cv2
import zmq
import base64
import numpy as np

context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))

while True:
    try:
        frame = footage_socket.recv_string()
        img = base64.b64decode(frame)
        npimg = np.fromstring(img, dtype=np.uint8)
        source = cv2.imdecode(npimg, 1)
        cv2.imshow("image", source)
        cv2.waitKey(1)

    except KeyboardInterrupt:
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break
Run Code Online (Sandbox Code Playgroud)

无论如何,祝你美好的一天!