Fra*_*ton 1 python opencv multiprocessing pafy
我正在使用pafy流式传输一组 YouTube 视频,目的是将它们组合起来(分屏样式)并显示为一个视频。它可以工作,但当超过两个视频时,帧速率非常慢,因为从每个流中获取帧,当我尝试 9 个视频(对于 3x3 拼接)时,获取帧需要 0.1725 秒(太慢)。
我认为减少这种情况的最佳方法是以并行/多进程方式获取流。
我尝试使用管道和多重处理,但收到 EOFError: Ran out of input
请参阅下面的代码注释掉/在行中frames = 进行更改,以在工作但缓慢的方法和我的多重处理尝试之间进行更改
import multiprocessing
import cv2
import numpy as np
import pafy
import typing
import timeit
urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",
    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",
    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080
def main():
    streams = [pafy.new(url).getbest() for url in urls]
    videos = [cv2.VideoCapture() for streams in streams]
    [video.open(best.url) for video, best in zip(videos, streams)]
    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        frames = get_frames(videos)
        print(timeit.default_timer() - start_time)
        start_time = timeit.default_timer()
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)
        start_time = timeit.default_timer()
        cv2.imshow('Video', dst)
        if cv2.waitKey(1) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)
        continue
    [video.release() for video in videos]
    cv2.destroyAllWindows()
def get_frames(videos):
    # frames = [video.read()[-1] for video in videos]
    jobs = []
    pipe_list = []
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()
    for proc in jobs:
        proc.join()
    frames = [x.recv() for x in pipe_list]
    return frames
def get_frame(video, send_end):
    send_end.send(video.read()[1])
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))
def merge_frames(frames: typing.List[np.ndarray]):
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    return np.vstack(rows)
if __name__ == '__main__':
    main()
有趣的应用!关于错误,我运行您的代码,消息是它无法腌制 VideoCapture 对象,请参阅下面的链接,这可能就是接收管道为空的原因。两个线程有两个错误:第一个是 pickle,然后是 EOF。
编辑#2:我设法用每个视频一个进程来运行它等:
关于性能,我首先在没有合并图像的情况下(我必须修复一些细节)来查看它是否接收到,并且对于 3 和 4 帧,显示在与接收线程分开的窗口中,它播放得非常快,比实时更快(使用 3-4 个流进行测试)。我认为显示的合并和调整大小很慢,4 个流的图片为 2560x1440 (4x1280x720)。就我而言,它已调整大小以适合屏幕。
感谢您分享该问题和该库等!
(顺便说一句,我最初也尝试过使用锁,但碰巧这是没有必要的。代码需要清理一些实验。此外,当前的实现可能不会针对每个流进行每帧同步,因为它没有加入每帧作为您的原始示例,它创建了新的进程来从每个帧中抓取一帧,然后合并它们。)
CPU 负载主要在主进程中(4 核 CPU,因此每个实例 max=25%):
有时:
0.06684677699999853 0.030737616999999773 1.2829999995744856e-06 LEN(帧)= 9 0.06703700200000284 0.030708104000002123 6.409999 997458726e-07 LEN(帧)= 9
主循环中的 waitKey 可以调整。
https://github.com/Twenkid/Twenkid-FX-Studio/blob/master/Py/YoutubeAggregatorPafy/y6.py
# Merging Youtube streams with pafy, opencv and multithreading
# Base code by Fraser Langton - Thanks!
# Refactored and debugged by Twenkid
# version y6 - more cleaning of unused code, properly close VideoCapture in the processes
import multiprocessing #Process, Lock
from multiprocessing import Lock # Not needed
import cv2
import numpy as np
import pafy
import typing
import timeit
import time
urls = [
    "https://www.youtube.com/watch?v=tT0ob3cHPmE",
    "https://www.youtube.com/watch?v=XmjKODQYYfg",
    "https://www.youtube.com/watch?v=E2zrqzvtWio",
    "https://www.youtube.com/watch?v=6cQLNXELdtw",
    "https://www.youtube.com/watch?v=s_rmsH0wQ3g",
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",
    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A", 
    "https://www.youtube.com/watch?v=39dZ5WhDlLE"
]
# Merging seems to require equal number of sides, so 2x2, 3x3 etc. The  resolutions should be the same.
'''
[    
    "https://www.youtube.com/watch?v=C_9x0P0ebNc",
    "https://www.youtube.com/watch?v=Ger6gU_9v9A",
    "https://www.youtube.com/watch?v=39dZ5WhDlLE",   
    "https://www.youtube.com/watch?v=QfhpNe6pOqU",
]
'''
width = np.math.ceil(np.sqrt(len(urls)))
dim = 1920, 1080
streams = []
#bestStreams = []
def main():
    global bestStreams
    streams = [pafy.new(url).getbest() for url in urls]
    print(streams)
    #[bestStreams for best in streams]
    #print(bestStreams)
    cv2.waitKey(0)
    videos = [cv2.VideoCapture() for streams in streams]
    bestURLS = [] 
    #[video.open(best.url) for video, best in zip(videos, streams)]  # Opened per process
    [bestURLS.append(best.url) for best in streams]
    
    #[ for video, best in zip(videos, streams)]
    print(bestURLS)
    cv2.waitKey(0)
    cv2.namedWindow('Video', cv2.WINDOW_FREERATIO)
    cv2.setWindowProperty('Video', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    LOCK = Lock()
    #proc = get_framesUL(bestStreams, LOCK)
    #proc, pipes = get_framesULJ(bestStreams, LOCK)
    proc, pipes = get_framesULJ(bestURLS, LOCK)     
    print("PROC, PIPES", proc, pipes)
    #cv2.waitKey(0)
    frames = []
    numStreams = len(streams)
    while True:
        start_time = timeit.default_timer()
        # frames = [cv2.resize(video.read()[-1], (dim[0] // width, dim[1] // width)) for video in videos]
        #frames = get_frames(videos, LOCK)
        #frames = get_framesUL(streams, LOCK)
        
        
        print(timeit.default_timer() - start_time)
        start_time = timeit.default_timer()
        
        frames = [x.recv() for x in pipes]
        lf = len(frames)
        print("LEN(FRAMES)=", lf);
        #if lf<3: time.sleep(3); print("LEN(FRAMES)=", lf); #continue #Else merge and show
        #proc.join()
        #elif lf==3: frames = [x.recv() for x in pipes]
                
        dst = merge_frames(frames)
        print(timeit.default_timer() - start_time)
         
        start_time = timeit.default_timer()      
        #if cv2!=None:
        try:
          cv2.imshow('Video', dst)
        except: print("Skip")
        #cv2.waitKey(1)  
        if cv2.waitKey(20) & 0xFF == ord('e'):
            break
        print(timeit.default_timer() - start_time)
        continue
        
    for proc in jobs:
        proc.join()
        
    # [video.release() for video in videos] # Per process
    cv2.destroyAllWindows()
def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
    # frames = [video.read()[-1] for video in videos]
    print("get_framesULJ:",videosURL)    
    jobs = []
    pipe_list = []
    #print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()
        #cv2.waitKey(0)
    #for proc in jobs:
    #    proc.join()
    #frames = [x.recv() for x in pipe_list]
    #return frames
    #cv2.waitKey(0)
    return jobs, pipe_list
def get_frame2L(videoURL, send_end, L):
    v = cv2.VideoCapture()
    #[video.open(best.url)
    #L.acquire()
    v.open(videoURL)
    print("get_frame2", videoURL, v, send_end)
    #cv2.waitKey(0)
    while True:      
      ret, frame = v.read()
      if ret: send_end.send(frame); #cv2.imshow("FRAME", frame); cv2.waitKey(1)   
      else: print("NOT READ!"); break
    #send_end.send(v.read()[1])
    #L.release()
    
def get_framesUL(videosURL, L):
    # frames = [video.read()[-1] for video in videos]
    jobs = []
    pipe_list = []
    print("VIDEOS:",videosURL)    
    #for video in videos:
    for videoURL in videosURL: #urls:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()
    for proc in jobs:
        proc.join()
    frames = [x.recv() for x in pipe_list]
    return frames
def get_frames(videos, L):
    # frames = [video.read()[-1] for video in videos]
    jobs = []
    pipe_list = []
    print("VIDEOS:",videos)    
    for video in videos:
        recv_end, send_end = multiprocessing.Pipe(False)
        print(recv_end, send_end)
        p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #p = multiprocessing.Process(target=get_frame, args=(video, send_end, L))
        #if (p==None): continue
        print("P = ", p)
        #time.sleep(0.001)
        jobs.append(p)
        print("JOBS, len", jobs, len(jobs))                
        pipe_list.append(recv_end)
        print("pipe_list", pipe_list)               
        p.start()
    for proc in jobs:
        proc.join()
    frames = [x.recv() for x in pipe_list]
    return frames
    
def get_frame(video, send_end, L):
    L.acquire()
    print("get_frame", video, send_end)
    send_end.send(video.read()[1])
    L.release()
    # send_end.send(cv2.resize(video.read()[1], (dim[0] // width, dim[1] // width)))
    
def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(frame)
      else: break
      
    
def merge_frames(frames: typing.List[np.ndarray]):
    #cv2.imshow("FRAME0", frames[0]) ########## not images/small
    #cv2.imshow("FRAME1", frames[1]) ##########
    #cv2.imshow("FRAME2", frames[2]) ##########
    #cv2.imshow("FRAME3", frames[3]) ##########
    #cv2.waitKey(1)
    width = np.math.ceil(np.sqrt(len(frames)))
    rows = []
    for row in range(width):
        i1, i2 = width * row, width * row + width
        rows.append(np.hstack(frames[i1: i2]))
    
    
    return np.vstack(rows)
if __name__ == '__main__':
    main()
编辑 #1 想法:为每个视频流创建一个进程并在循环中读取它(在管道中泵送),而不是为每一帧创建一个新进程,和/从而通过管道使用 videoURL 打开视频/VideoCapture 对象,而不是发送 VideoCapture 对象。(不知道这个形式是否也有同样的pickle问题)
...
in main:
bestURLS = []
proc, pipes = get_framesULJ(bestURLS, LOCK) 
[bestURLS.append(best.url) for best in streams]
def get_frame2(videoURL, send_end):
    v = video.open(videoURL)       
    while True:
      ret, frame = v.read()
      if ret: send_end.send(video)
      else: break
 def get_framesULJ(videosURL, L): #return the processes, join in main and read the frames there
print("get_framesULJ:",videosURL)    
jobs = []
pipe_list = []
for videoURL in videosURL:
    recv_end, send_end = multiprocessing.Pipe(False)
    print(recv_end, send_end)
    p = multiprocessing.Process(target=get_frame2L, args=(videoURL, send_end, L))       
    print("P = ", p)
    jobs.append(p)
    print("JOBS, len", jobs, len(jobs))                
    pipe_list.append(recv_end)
    print("pipe_list", pipe_list)               
    p.start()
return jobs, pipe_list
原答案:
<multiprocessing.connection.PipeConnection object at 0x000000000D3C7D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3BD2E0>
Traceback (most recent call last):
  File "y.py", line 104, in <module>
    main()
  File "y.py", line 48, in main
    frames = get_frames(videos)
  File "y.py", line 80, in get_frames
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in
start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 326, in
_Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", lin
e 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in
 dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'cv2.VideoCapture' object
Z:\>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 116, in sp
awn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 126, in _m
ain
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
它在 p.start 之前失败。实例已创建,结构看起来不错:
VIDEOS: [<VideoCapture 000000000D418710>, <VideoCapture 000000000D4186F0>, <Vide
oCapture 000000000D418B70>]
<multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90> <multip
rocessing.connection.PipeConnection object at 0x000000000D3B62E0>
P =  <Process name='Process-1' parent=8892 initial>
JOBS, len [<Process name='Process-1' parent=8892 initial>] 1
RECV_END <multiprocessing.connection.PipeConnection object at 0x000000000D3C3D90
>
请参阅模块 pickle:
https://docs.python.org/3/library/pickle.html
看来并不是所有的东西都可以“腌制”。
什么可以腌制和不腌制?
可以腌制以下类型:
Run Code Online (Sandbox Code Playgroud)None, True, and False integers, floating point numbers, complex numbers strings, bytes, bytearrays tuples, lists, sets, and dictionaries containing only picklable objects functions defined at the top level of a module (using def, not lambda) built-in functions defined at the top level of a module classes that are defined at the top level of a module instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details).
此外,opencv 中似乎有一个错误导致了这一点。给出的解决方案之一是关闭多处理......
Python多进程无法pickle opencv videocapture对象
https://github.com/MVIG-SJTU/AlphaPose/issues/164
方浩树于2018年10月17日发表评论
该错误是由于 opencv 中的多重处理造成的。--sp 禁用多处理。顺便说一句,你能告诉我你正在使用的 opencv 版本吗?
我猜想是关于锁定记忆之类的。
我尝试的解决方法是首先将对象的像素转储为纯数据或原始数据,可能带有有关大小等的标题。
另外,一般来说,为了更流畅的播放,我认为需要添加一些缓冲。
顺便问一下,你的 openCV 是什么版本?我的是4.2.0
| 归档时间: | 
 | 
| 查看次数: | 2031 次 | 
| 最近记录: |