如何在多线程时收集函数返回值而不使用全局变量?

fly*_*zai 1 python multithreading

因此,我正在尝试制定一个通用解决方案,该解决方案将从函数中收集所有值并将它们附加到以后可以访问的列表中。concurrent.futures这将在任务期间或类型任务中使用threading。这是我使用全局的解决方案master_list

from concurrent.futures import ThreadPoolExecutor

master_list = []
def return_from_multithreaded(func):
    # master_list = []
    def wrapper(*args, **kwargs):
        # nonlocal master_list
        global master_list
        master_list += func(*args, **kwargs)
    return wrapper


@return_from_multithreaded
def f(n):
    return [n]


with ThreadPoolExecutor(max_workers=20) as exec:
    exec.map(f, range(1, 100))

print(master_list)
Run Code Online (Sandbox Code Playgroud)

我想找到一个不包含全局变量的解决方案,并且也许可以返回存储master_list为闭包的注释掉?

Sha*_*ger 7

如果您不想使用全局变量,请不要丢弃map. map正在返回每个函数返回的值,您只是忽略它们。map通过使用此代码可以使其达到预期目的,从而变得更加简单:

def f(n):
    return n  # No need to wrap in list

with ThreadPoolExecutor(max_workers=20) as exec:
    master_list = list(exec.map(f, range(1, 100)))

print(master_list)
Run Code Online (Sandbox Code Playgroud)

如果您需要master_list显示到目前为止计算的结果(也许其他线程正在观察它),您只需使循环显式化即可:

def f(n):
    return n  # No need to wrap in list

master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    for result in exec.map(f, range(1, 100)):
        master_list.append(result)

print(master_list)
Run Code Online (Sandbox Code Playgroud)

这就是 Executor 模型的设计目的;普通线程并不打算返回值,但 Executors 提供了一个在幕后返回值的通道,因此您不必自己管理它。在内部,这是使用某种形式的队列,并使用附加元数据来保持结果有序,但您不需要处理这种复杂性;从你的角度来看,它相当于常规map函数,只是恰好并行化工作。


更新以涵盖异常处理:

map当结果达到时,将引发工作人员中引发的任何异常。因此,正如所写的,如果任何任务失败,第一组代码将不会存储任何内容(将list部分构造,但在引发异常时被丢弃)。第二个示例将仅保留抛出第一个异常之前的结果,其余部分将被丢弃(您必须存储迭代map器并使用一些尴尬的代码来避免它)。如果您需要存储所有成功的结果,忽略失败(或只是以某种方式记录它们),最简单的方法是submit创建一个list对象Future,然后串行或按完成顺序等待它们,将调用包装.result()try/except中避免丢弃好的结果。例如,要按提交顺序存储结果,您可以这样做:

master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    futures = [exec.submit(f, i) for i in range(1, 100)]
    exec.shutdown(False)  # Optional: workers terminate as soon as all futures finish,
                          # rather than waiting for all results to be processed
    for fut in futures:
        try:
            master_list.append(fut.result())
        except Exception:
            ... log error here ...
Run Code Online (Sandbox Code Playgroud)

为了获得更高效的代码,您可以按完成顺序检索结果,而不是按提交顺序,以便concurrent.futures.as_completed在结果完成时立即检索结果。与之前的代码相比,唯一的变化是:

    for fut in futures:
Run Code Online (Sandbox Code Playgroud)

变成:

    for fut in concurrent.futures.as_completed(futures):
Run Code Online (Sandbox Code Playgroud)

as_completed完成/取消的 future的工作yield一旦完成就在哪里,而不是延迟到所有之前提交的 future 完成并得到处理。

还有更复杂的选项涉及使用add_done_callback,因此主线程根本不参与显式处理结果,但这通常是不必要的,而且常常令人困惑,因此最好尽可能避免。