如何在ZMQ中使用序列化发送图像和数据字符串?

JMa*_*arc 6 python zeromq

我的目标是将图像和数据字符串从 RPi(服务器)发送到客户端。我send_json(data)在数据是 dict 的地方使用{'img': img_ls, 'telemetry':'0.01, 320, -10'}img_ls是将图像转换为列表。问题是我得到len( img_ls ) = 57556,而原始图像的大小为: 320 x 240 = 76800。我不明白为什么会出现差异。这是代码:

服务器端

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://0.0.0.0:5557")


def outputs():
    stream = io.BytesIO()
    while True:
        yield stream
        stream.seek(0)
        sensors = '0.01, 320, -10'
        img_ls = np.fromstring(stream.getvalue(), dtype=np.uint8).tolist()
        data = {'telemetry': sensors, 'img': img_ls}
        socket.send_json(data)
        stream.seek(0)
        stream.truncate()

with picamera.PiCamera() as camera:
        camera.resolution = (320, 240)
        camera.framerate = 80
        time.sleep(2)
        camera.capture_sequence(outputs(), 'jpeg', use_video_port=True)
Run Code Online (Sandbox Code Playgroud)

客户端

ip_server = "192.168.42.1"
context = zmq.Context()
zmq_socket = context.socket(zmq.SUB)
zmq_socket.setsockopt(zmq.SUBSCRIBE, b'')
zmq_socket.setsockopt(zmq.CONFLATE, 1)
zmq_socket.connect("tcp://{}:5557".format(ip_server))

try:
    img_nbr = 1
    while True:
        start = time.time()
        frames = zmq_socket.recv_json()
        img_ls = frames['img']
        telemetry = frames['telemetry']
        #convert img list to array
        img_arr = np.asarray(img_ls)
        #reshape gives error because 320*240 != len(img_ls)
        image = np.reshape(img_ls, (320, 240))
        #save image file locally 
        image = Image.fromarray(image)
        #timestamp in ms
        timestamp = int(time.time() * 1000 )
        image.save('img_'+str(timestamp)+'.jpg')
        print('Frame number: ', str(img_nbr))
        img_nbr += 1
finally:
    pass
Run Code Online (Sandbox Code Playgroud)

最后说明:这是我尝试将图像和传感器数据从 RPi 同步流式传输到客户端。我担心数组和列表转换(在 RPi 端完成)可能会减慢流传输速度。如果有更好的方法(仍然)使用 来做到这一点zmq,请告诉我。

use*_*197 3

图像处理非常消耗 CPU 资源。所以,首先是性能:

ZeroMQ 应允许人们享受零复制操作方式,因此防止任何破坏这种操作的不良操作。

仅使用通用 OpenCV 相机,而不是 RPi / PiCamera,我总是更喜欢在受控事件循环下的采集端获取单独的相机帧(而不是序列)。

相机获取已知的固定几何图片(在 OpenCV 中为numpy.ndarray3D 结构 [X,Y,[B,G,R]] ),因此最快、最直接的序列化是struct.pack( CONST_FRAME_STRUCT_MASK, aFrame )在发送方和struct.unpack( CONST_FRAME_STRUCT_MASK, aMessage )接收方使用(s)-侧。

是的,struct.pack()这是迄今为止最快的方法,即使文档提供了其他方法(灵活性需要额外的成本,这是不合理的):

import numpy

def send_array( socket, A, flags = 0, copy = True, track = False ):
    """send a numpy array with metadata"""
    md = dict( dtype = str( A.dtype ),
               shape =      A.shape,
               )
    pass;  socket.send_json( md, flags | zmq.SNDMORE )
    return socket.send(      A,  flags, copy = copy, track = track )

def recv_array( socket, flags = 0, copy = True, track = False ):
    """recv a numpy array"""
    md = socket.recv_json( flags = flags )
    msg = socket.recv(     flags = flags, copy = copy, track = track )
    buf = buffer( msg )
    pass;  A = numpy.frombuffer( buf, dtype = md['dtype'] )
    return A.reshape(                         md['shape'] )
Run Code Online (Sandbox Code Playgroud)

任何颜色转换和类似的源端转换可能会消耗 +150 ~ 180 [ms],因此请尽量避免任何和所有不必要的颜色空间或重塑或类似的非核心转换,因为这些会不利地增加累积的管道延迟信封。

使用struct.pack()还可以避免任何类型的尺寸不匹配,因此您加载到二进制有效负载着陆垫上的内容正是您在接收器侧接收到的内容。

如果确实希望在消息核心数据周围也有与 JSON 相关的开销,那么最好设置一个双套接字范例,两者都具有ZMQ_CONFLATE == 1,其中第一个移动struct有效负载,第二个 JSON 装饰的遥测数据。

如果 RPi 允许,zmq.Context( nIOthreads )可能会进一步增加双方的数据泵吞吐量nIOthreads >= 2,并且额外的JSON_socket.setsockopt( ZMQ_AFFINITY, 1 ); VIDEO_socket.setsockopt( ZMQ_AFFINITY, 0 )映射可以分离/分配工作负载,使每个工作负载运行在不同的、单独的 上IOthread