Python tqdm process_map:追加进程之间共享的列表?

Mik*_*ike 3 python multithreading multiprocessing tqdm

我想共享一个列表来附加并行线程的输出,从process_mapfrom开始tqdm。(我想使用的原因process_map是很好的进度指示器和max_workers=选项。)

我尝试使用它from multiprocessing import Manager来创建共享列表,但我在这里做错了:我的代码打印一个空的shared_list,但它应该打印一个包含 20 个数字的列表,正确的顺序并不重要。

任何帮助将不胜感激,提前谢谢!

import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager


shared_list = []

def worker(i):
    global shared_list
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(worker, range(20), max_workers=5)
    print(shared_list)
Run Code Online (Sandbox Code Playgroud)

Boo*_*boo 15

您没有指定您正在运行的平台(每当您用 标记问题时,您都应该用您的平台标记您的问题multiprocessing),但看起来您正在用于创建新进程的平台spawn(例如 Windows)下运行。这意味着当启动一个新进程时,会创建一个空的地址空间,启动一个新的Python解释器,并从顶部重新执行源代码。

if __name__ == '__main__':因此,尽管您在开始分配给托管列表的块中shared_list,创建的池中的每个进程都将执行shared_list = []破坏您的初始分配的操作。

您可以将shared_list第一个参数传递给工作函数:

import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
from functools import partial

def worker(shared_list, i):
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(partial(worker, shared_list), range(20), max_workers=5)
    print(shared_list)
Run Code Online (Sandbox Code Playgroud)

如果以与类相同的方式process_map支持初始化程序initargsProcessPoolExecutor参数(看起来它不支持),那么您可以这样做:

import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager

def init_pool(the_list):
    global shared_list
    shared_list = the_list

def worker(i):
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(worker, range(20), max_workers=5, initializer=init_pool, initargs=(shared_list,))
    print(shared_list)
Run Code Online (Sandbox Code Playgroud)

评论

这本身与您的原始问题无关,但对于这种类型的问题,您可能需要考虑使用托管列表,而不是您的工作函数(恰好命名为worker)附加元素,并且附加元素的顺序是不确定性,因为您无法控制池进程的调度,实例multiprocessing.Array初始化如下:

shared_list = Array('i', [0] * 20, lock=False)
Run Code Online (Sandbox Code Playgroud)

然后你的工作函数就变成了:

def worker(i):
    time.sleep(1)
    shared_list[i] = i
Run Code Online (Sandbox Code Playgroud)

这里,数组存储在共享内存中,甚至不需要锁定访问,因为每次调用都worker访问数组的不同索引。访问共享内存数组的元素比访问托管列表的元素要快得多唯一的问题是对共享内存的引用不能作为参数传递,并且我们看到它process_map不支持初始化程序initargs参数。所以你必须使用较低级别的方法。例如:

import time
from multiprocessing import Pool, Array
from tqdm import tqdm

def init_pool(the_list):
    global shared_list
    shared_list = the_list

def worker(i):
    time.sleep(1)
    shared_list[i] = i

if __name__ == '__main__':
    # Preallocate 20 slots for the array in shared memory
    # And we don't require a lock if each worker invocation is accessing a different Array index:
    args = range(20)
    shared_list = Array('i', [0] * len(args), lock=False)

    with tqdm(total=len(args)) as pbar:
        pool = Pool(5, initializer=init_pool, initargs=(shared_list,))
        for result in pool.imap_unordered(worker, args):
            pbar.update(1)
    # print out elements one at a time:
    for elem in shared_list:
        print(elem)
    # print out all elements at once (must first convert to a regular list):
    print(list(shared_list))
Run Code Online (Sandbox Code Playgroud)

评论2

我会避免使用process_map. 该函数基于map方法的方法ProcessPoolExecutor.map,要求按照与传递的可迭代元素相对应的顺序返回结果,而不是按照完成的顺序。想象一下,如果由于某种原因,处理第一个提交的任务(i在我们的例子中为 0)的进程需要很长时间来处理并最终成为最后完成的任务,会发生什么情况。您会看到tqdm进度条很长一段时间没有任何反应,直到第一个提交的任务完成。但当这种情况发生时,我们知道所有其他提交的任务都已经完成,因此进度条会立即从 0 跳到 100%。修改函数worker如下以查看其实际效果:

def worker(shared_list, i):
    if i == 0:
        time.sleep(5)
    else:
        time.sleep(.25)
    shared_list.append(i)
Run Code Online (Sandbox Code Playgroud)

我上面提供的代码版本Pool.imap_unordered允许无序返回结果,并且默认chunksize值为 1,它将按完成顺序排列。进度条会更加顺畅地前进。

评论3

似乎也有一个错误tqdmtqdm以下程序演示了如何在模块中使用这次的低级调用concurrent.futures。不幸的是,它的ProcessPoolExecutor类(用于多处理)和ThreadPoolExecutor类(用于多线程)没有与该方法等效的imap_unordered方法。您必须使用该submit方法(其multiprocessing.pool.Pool类似物是该apply_async方法),该方法返回一个Future实例,您可以在该实例上调用该result方法来阻止完成并返回已提交任务的结果)。您将执行submit一堆任务并将返回的Future实例存储在一个列表中,然后使用as_completed函数调用从该列表返回下一个已完成的已Future完成实例。

该演示使用线程并创建一个大小为20的线程池并提交20个任务,因此所有任务应该同时启动。的睡眠时间worker1设置为不同的,因此i参数的值越小,睡眠时间越长。该程序创建池并提交任务 4 次。第一次,仅打印返回值。第二次tqdm使用进度条。结果正如您所期望的那样。第三次worker2与进度条一起使用tqdm。不同之处在于,对于所有i != 0睡眠时间值来说都是一个常数(0.25 秒),因此对于i值 1、2、... 19,任务应大致在同一时间完成。因此,您希望看到进度条在很短的时间内跳到 95%,然后等待任务i == 0完成。然而,你观察到的却恰恰相反。进度条转到 5% 并挂在那里很长一段时间,然后跳到 100%。第四种情况是worker2与我自己的“进度条”一起使用,其行为正如您所期望的那样。

这是tqdmPython 3.8.5下的4.61.1。我已经在 Windows 和 Linux 下测试过了。有人对这种行为有解释吗?

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import sys

class MyProgressBar:
    def __init__(self, n_tasks):
        self._task_count = n_tasks
        self._completed = 0
        self.update()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(file=sys.stderr)
        return False

    def update(self, count=0):
        self._completed += count
        print(f'\r{self._completed} of {self._task_count} task(s) completed.', end='', flush=True)

def worker1(i):
    if i == 0:
        time.sleep(8)
    else:
        time.sleep(5 - i/5)
    return i

def worker2(i):
    if i == 0:
        time.sleep(8)
    else:
        time.sleep(.25)
    return i

if __name__ == '__main__':
    args = range(20)

    with ThreadPoolExecutor(max_workers=20) as pool:
        futures = [pool.submit(worker1, arg) for arg in args]
        for future in as_completed(futures):
            print(future.result())

    with ThreadPoolExecutor(max_workers=20) as pool:
        with tqdm(total=len(args)) as pbar:
            futures = [pool.submit(worker1, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)

    with ThreadPoolExecutor(max_workers=20) as pool:
        with tqdm(total=len(args)) as pbar:
            futures = [pool.submit(worker2, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)

    # Works with my progress "bar":
    with ThreadPoolExecutor(max_workers=20) as pool:
        with MyProgressBar(len(args)) as pbar:
            futures = [pool.submit(worker2, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)
Run Code Online (Sandbox Code Playgroud)