为什么 concurrent.futures 不制作参数的副本?

ari*_*ell 5 python parallel-processing multiprocessing python-3.x concurrent.futures

我的理解是 concurrent.futures 依靠酸洗参数来让它们在不同的进程(或线程)中运行。酸洗不应该创建参数的副本吗?在 Linux 上它似乎没有这样做,即,我必须明确地传递一个副本。

我试图理解以下结果:

<0> rands before submission: [17, 72, 97, 8, 32, 15, 63, 97, 57, 60]
<1> rands before submission: [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
<2> rands before submission: [15, 57, 63, 17, 97, 97, 8, 32, 60, 72]
<3> rands before submission: [32, 97, 63, 72, 17, 57, 97, 8, 15, 60]
in function 0 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
in function 1 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 2 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 3 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
Run Code Online (Sandbox Code Playgroud)

这是代码:

from __future__ import print_function
import time
import random
try:
    from concurrent import futures
except ImportError:
    import futures


def work_with_rands(i, rands):
    print('in function', i, rands)


def main():
    random.seed(1)
    rands = [random.randrange(100) for _ in range(10)]

    # sequence 1 and sequence 2 should give the same results but they don't
    # only difference is that one uses a copy of rands (i.e., rands.copy())
    # sequence 1
    with futures.ProcessPoolExecutor() as ex:
        for i in range(4):
            print("<{}> rands before submission: {}".format(i, rands))
            ex.submit(work_with_rands, i, rands)
            random.shuffle(rands)

    print('-' * 30)
    random.seed(1)
    rands = [random.randrange(100) for _ in range(10)]
    # sequence 2
    print("initial sequence: ", rands)
    with futures.ProcessPoolExecutor() as ex:
        for i in range(4):
            print("<{}> rands before submission: {}".format(i, rands))
            ex.submit(work_with_rands, i, rands[:])
            random.shuffle(rands)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

地球[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]从哪里来?这甚至不是传递给submit.

Python 2 下的结果略有不同。

psl*_*psl 3

基本上, ProcessPoolExecutor.submit() 方法将函数及其参数传递给一些“工作项”字典(没有任何酸洗),该字典与另一个线程(_queue_management_worker)共享,并且该线程将 WorkItems 从该字典传递到由读取的队列实际的工人进程。

源代码中有一条注释,描述了并发模块架构: http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l6

事实证明, _queue_management_worker没有足够的时间在提交调用之间收到有关新项目的通知。

因此,该线程一直在这里等待:(http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l226)并且仅在 ProcessPoolExecutor.shutdown 时唤醒(在退出时) ProcessPoolExecutor 上下文)。

如果你在第一个序列中添加一些延迟,就像这样:

with futures.ProcessPoolExecutor() as ex:
    for i in range(4):
        print("<{}> rands before submission: {}".format(i, rands))
        ex.submit(work_with_rands, i, rands)
        random.shuffle(rands)
        time.sleep(0.01)
Run Code Online (Sandbox Code Playgroud)

您将看到,_queue_management_worker将唤醒并将调用传递给工作进程,并且 work_with_rands 将打印不同的值。