Python 多处理进度方法

MrE*_*ore 5 python progress multiprocessing

我一直忙于编写我的第一个多处理代码,它有效,是的。但是,现在我想要一些进展反馈,但我不确定最好的方法是什么。

简而言之,我的代码(见下文)的作用:

  • 扫描目标目录中的 mp4 文件
  • 每个文件由单独的进程分析,该进程保存结果(图像)

我正在寻找的可能是:

  1. 简单的
  • 每次进程完成一个文件时,它都会发送一条“完成”消息
  • 主要代码记录已完成的文件数量
  1. 想要
Core 0  processing file 20 of 317 ||||||____ 60% completed
Core 1  processing file 21 of 317 |||||||||_ 90% completed
...
Core 7  processing file 18 of 317 ||________ 20% completed
Run Code Online (Sandbox Code Playgroud)

我阅读了有关队列、池、tqdm 的各种信息,但我不确定该走哪条路。有人能指出一种在这种情况下有效的方法吗?

提前致谢!

编辑:更改了我的代码,按照 gsb22 的建议启动进程

我的代码:

# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2


# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\\videofiles"

def get_videofiles(target_directory):

    # Find all video files in directory and subdirectories and put them in a list
    videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
    # Return the list
    return videofiles


def process_file(videofile):

    '''
    What happens inside this function:
    - The video is processed and analysed using openCV
    - The result (an image) is saved to the results folder
    - Once this function receives the videofile it completes
      without the need to return anything to the main program
    '''

    # The processing code is more complex than this code below, this is just a test
    cap = cv2.VideoCapture(videofile)

    for i in range(10):
        succes, frame = cap.read()

        # cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)

        if succes:
            try:
                cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
            except:
                print('something went wrong')


if __name__ == "__main__":

    # Create directory to save results if it doesn't exist
    if not os.path.exists(targetDirectory + '/_Results'):
        os.makedirs(targetDirectory + '/_Results')

    # Get a list of all video files in the target directory
    all_files = get_videofiles(targetDirectory)

    print(f'{len(all_files)} video files found')

    # Create list of jobs (processes)
    jobs = []

    # Create and start processes
    for file in all_files:
        proc = Process(target=process_file, args=(file,))
        jobs.append(proc)

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

    # TODO: Print some form of progress feedback

    print('Finished :)')
Run Code Online (Sandbox Code Playgroud)

2e0*_*byo 1

我阅读了有关队列、池、tqdm 的各种信息,但我不确定该走哪条路。有人能指出一种在这种情况下有效的方法吗?

这是一种以最低成本获取进度指示的非常简单的方法:

from multiprocessing.pool import Pool
from random import randint
from time import sleep

from tqdm import tqdm


def process(fn) -> bool:
    sleep(randint(1, 3))
    return randint(0, 100) < 70


files = [f"file-{i}.mp4" for i in range(20)]

success = []
failed = []
NPROC = 5
pool = Pool(NPROC)


for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
    if status:
        success.append(fn)
    else:
        failed.append(fn)

print(f"{len(success)} succeeded and {len(failed)} failed")
Run Code Online (Sandbox Code Playgroud)

一些评论:

  • tqdm 是一个第三方库,它非常好地实现进度条。还有其他的。 pip install tqdm
  • 我们使用一个进程池(几乎没有理由为了像这样的简单事情而自己管理进程)NPROC。我们让池处理对输入数据的迭代处理函数。
  • 我们通过让函数返回一个布尔值来表示状态(在这个例子中,我们随机选择,加权有利于成功)。我们不返回文件名,尽管我们可以,因为它必须被序列化并从子进程发送,这是不必要的开销。
  • 我们使用Pool.imap,它返回一个迭代器,它与我们传入的迭代器保持相同的顺序。所以我们可以使用直接zip迭代 files。由于我们使用大小未知的迭代器,tqdm因此需要知道它的长度。(我们本来可以使用pool.map,但不需要提交 ram ——尽管对于一个 bool 来说可能没有什么区别。)

我特意把它写成一种食谱。只需使用范式中的高级下降,您就可以使用多处理做很多事情,并且Pool.[i]map是最有用的之一。

参考

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool https://tqdm.github.io/