Iva*_*iti 11 python stdout image stream cv2
在我之前的文章中,我们找到了一种将图像文件从一个 python传递到另一个 python 脚本的方法: 将视频数据从一个 python 脚本传递到另一个
我现在正在尝试传递视频(连续图像):
写.py
import sys
import numpy as np
import cv2
from PIL import Image
import io
import time
while True:
img = cv2.imread('cat.jpg')
bimg = cv2.imencode('.jpg',img)[1]
sys.stdout.buffer.write(bimg)
sys.stdout.flush()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
阅读.py:
import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
while True:
data = sys.stdin.buffer.read()
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
cv2.imshow('image', img_np)
cv2.waitKey(0)
Run Code Online (Sandbox Code Playgroud)
如果我将 write.py 数据输出到终端,它会打印出来。如果我手动将数据交给 read.py 读取。但是把它们放在一起 ( python3 write.py | python3 read.py) 就挂了。write.py 只写一次,而 read.py 似乎永远不会得到它。
我的猜测是,读取代码正在等待写入代码“结束”,然后才将数据包打包并称为图像。虽然如果是这样的话,我认为做同花顺可以解决它。
您已经提到要发送的图像大小不一致,但我必须假设它来自同一台摄像机(对于给定的视频流),原始图像大小不会改变,而只是压缩图像大小。我想你可能有足够的 RAM 来一次在内存中存储至少一帧未压缩的帧,并且你只是引入了所有压缩和解压缩的处理开销。
鉴于我将创建一个共享缓冲区multiprocessing.shared_memory,它可以在两个进程之间共享帧(如果您想要真正的花哨,您甚至可以创建一个几个帧的循环缓冲区,并防止屏幕撕裂,但这并不是一个大问题我的测试)
鉴于cv2.VideoCapture().read()可以直接读取现有数组,并且您可以创建一个使用共享内存作为缓冲区的 numpy 数组,您可以将数据读入共享内存,并且零额外复制。使用此功能,我能够以每秒近 700 帧的速度从分辨率为 1280x688 的 H.264 编码的视频文件中读取数据。
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4') #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
input("writer is ready: press enter once reader is ready")
try: #use keyboardinterrupt to quit
while True:
cap.read(frame_buffer) #read data into frame buffer
# sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the
# shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()
Run Code Online (Sandbox Code Playgroud)
读取器过程看起来非常相似,但我们不是创建视频设备并read读取帧,而是构建共享数组和imshow一堆。GUI 的速度不如转储数据那么快,所以我们没有达到 700 fps,但高达 500 fps 也不错......
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")
#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
while True:
cv2.imshow('frame', frame_buffer)
cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT the writer process should close before this one, so nothing
# tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()
Run Code Online (Sandbox Code Playgroud)
编辑:用户的其他问题表明可能需要早于 3.8 的 python 版本(甚至跨版本工作),所以这里是一个使用posix_ipcin-place ofmultiprocessing.shared_memory创建帧缓冲区的示例(以及如何清理它):
#creation
shm = posix_ipc.SharedMemory(name="frame_buf",
flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer
#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
Run Code Online (Sandbox Code Playgroud)
I think I figured it out. In read.py, sys.stdin.buffer.read() reads and waits until the stdin pipe is closed but write.py never actually closes its stdout because of the while True loop. This proof of concept simplified example works:
write.py
import sys
import time
sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
Run Code Online (Sandbox Code Playgroud)
and read.py
import sys
with open("output.txt", "w") as file:
file.write(sys.stdin.read())
Run Code Online (Sandbox Code Playgroud)
This will also hang and will never actually write anything to "output.txt". If we remove the while True loop from write.py the code will no longer hang and "Hello World" will be written to "output.py" because when write.py is finished writing it will close its process and that will close the pipe. To fix this issue I recommend changing read.py to something like this:
import sys
while True:
with open("output.txt", "a") as file:
file.write(sys.stdin.read(1))
Run Code Online (Sandbox Code Playgroud)
Solution:
write.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
msg = b"Hello world"
# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())
sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
Run Code Online (Sandbox Code Playgroud)
and read.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
while True:
time.sleep(1) # Make sure `write.py` has sent the data
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
time.sleep(1) # Make sure `write.py` has sent the data
# Here you can switch to a different file every time `writer.py`
# Sends a new file
with open("output.txt", "wb") as file:
file.write(sys.stdin.buffer.read(length))
file_ended = sys.stdin.buffer.read(1)
if file_ended == b"1":
# File has ended
break
else:
# We are going to start reading again for the next file:
pass
Run Code Online (Sandbox Code Playgroud)
Edit: The solution works like this:
read.py它是否应该等待另一个文件对于第 1 部分,我们只是将文件的长度编码为前面用 0 填充的字符串。注意:确保MAX_FILE_SIZE大于最大文件的大小(较大的数字会稍微降低性能)。对于第 3 部分,如果我们发送 a,"1"则意味着没有更多文件要发送。否则reader.py将等待并接受下一个文件。所以write.py会变成:
from math import log
import time
import sys
import cv2
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def write_file(buffer, data, last_file=False):
# Tell `reader.py` that it needs to read x number of bytes.
length = len(data)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
with open("output.txt", "w") as file:
file.write(length)
buffer.write(length.encode())
# Write the actual data
buffer.write(data)
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
buffer.write(str(int(last_file)).encode())
buffer.flush()
while True:
img = cv2.imread("img.jpg")
bimg = cv2.imencode(".jpg", img)[1]
# Call write_data
write_file(sys.stdout.buffer, bimg, last_file=False)
# time.sleep(1) # Don't need this
Run Code Online (Sandbox Code Playgroud)
并将read.py成为:
from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def read(buffer, number_of_bytes):
output = b""
while len(output) < number_of_bytes:
output += buffer.read(number_of_bytes - len(output))
assert len(output) == number_of_bytes, "An error occured."
return output
def read_file(buffer):
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(read(buffer, MAX_FILE_SIZE))
# Here you can switch to a different file every time `writer.py`
# Sends a new file
data = read(buffer, length)
# Read a byte so that we know if it is the last file
file_ended = read(buffer, 1)
return data, (file_ended == b"1")
while True:
print("Reading file")
data, last_file = read_file(sys.stdin.buffer)
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
cv2.IMREAD_UNCHANGED)
cv2.imshow("image", img_np)
cv2.waitKey(0)
if last_file:
break;
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
509 次 |
| 最近记录: |