在多处理 apply_async 中维护实例状态

Bra*_*roy 4 python oop python-multiprocessing

我希望如果我调用apply_async一个实例方法并获得它的结果,那么所做的任何更改都将保留在分叉进程中。但是,似乎每次对 apply_async 的新调用都会创建该实例的新副本。

取以下代码:

from multiprocessing.pool import Pool


class Multitest:
    def __init__(self):
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(10):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()
Run Code Online (Sandbox Code Playgroud)

示例输出:

i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9
Run Code Online (Sandbox Code Playgroud)

但是由于我们有两个核心,上面分布着 10 个输入,我原i以为属性会增加。

我曾期望以下流程:

  • 主线程创建实例并调用 run()
  • 主线程apply_async通过初始化两个新进程和原始 Multitest 实例的副本(其中i = 0)来分配池的工作
  • process()多次调用新进程(直到range()耗尽)。每次调用进程时,self.i该进程都会递增

注意:我不是在询问两个进程之间的共享状态。相反,我问为什么单个进程的类实例没有发生变异(为什么每个进程都self.i没有增加)。

但是,我没有看到这种行为。相反,打印的输出只是零,表明我的期望是错误的:状态(属性i)没有得到维护,但是在每次调用apply_async. 我在这里缺少什么,我怎样才能按预期进行这项工作?(apply_async尽管不是必需的,但最好使用。不过,应保持结果的顺序。)

据我所知,这种行为不是特定于apply_async其他pool方法的,而是特定于其他方法的。我有兴趣了解为什么会发生这种情况以及如何将行为更改为我想要实现的行为。赏金会找到可以为两个查询提供答案的答案。

Nic*_*sso 5

我想向您指出参考文献,但我还没有,所以我将根据经验证据分享我的想法:

每次调用 apply_async 都会准备一个新的命名空间副本。您可以通过向print(self)进程内部添加调用来查看这一点。所以这部分不是真的:

主线程分配工作...通过初始化两个新进程和原始 Multitest 实例的副本

相反,有两个新进程和原始 Multitest 实例的十个副本。所有这些副本都是从主进程制作的,它的 i 副本没有增加。为了证明这一点,time.sleep(1); self.i += 1在调用 apply_async 之前添加,并注意 a) 主线程中 i 的值递增,以及 b) 通过延迟 for 循环,到下一次调用 apply_async 时,原始 Multitest 实例已更改触发新副本。

代码:

from multiprocessing.pool import Pool
import time

class Multitest:
    def __init__(self):
        print("Creating new Multitest instance: {}".format(self))
        self.i = 0

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                time.sleep(1); self.i += 1
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        print("i", self.i)
        print("Copied instance: {}".format(self))
        self.i += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()
Run Code Online (Sandbox Code Playgroud)

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
i 1
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
i 2
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
i 3
Copied instance: <__mp_main__.Multitest object at 0x101052d90>
input 0
input 1
input 2
i 4
Copied instance: <__mp_main__.Multitest object at 0x101052df0>
input 3
Run Code Online (Sandbox Code Playgroud)

至于您的第二个查询,我认为如果您希望在流程中维护状态,您可能只需要提交一项工作。不是 Pool(2) 处理 10 个独立的作业,而是让 Pool(2) 处理 2 个独立的作业,每个作业由 5 个相互依赖的子作业组成。或者,如果您真的想要 10 个作业,您可以使用由 pid 索引的共享数据结构,这样在单个进程中操作(按顺序)的所有作业都可以操作 i 的单个副本。

这是一个具有共享数据结构的示例,以模块中的全局形式:

from multiprocessing.pool import Pool
from collections import defaultdict
import os
import myglobals # (empty .py file)

myglobals.i = defaultdict(lambda:0)

class Multitest:
    def __init__(self):
        pid = os.getpid()
        print("Creating new Multitest instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))

    def run(self):
        with Pool(2) as pool:
            worker_jobs = []
            for j in range(4):
                job = pool.apply_async(self.process, (j,))
                worker_jobs.append(job)

            for job in worker_jobs:
                res = job.get()
                print("input", res)

    def process(self, inp):
        pid = os.getpid()
        print("Copied instance: {}".format(self))
        print("i {} (pid: {})".format(myglobals.i[pid], pid))
        myglobals.i[pid] += 1

        return inp

if __name__ == '__main__':
    mt = Multitest()
    mt.run()
Run Code Online (Sandbox Code Playgroud)

结果:

Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
i 0 (pid: 3460)
Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
i 0 (pid: 3463)
Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
i 0 (pid: 3462)
Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
i 1 (pid: 3462)
i 1 (pid: 3463)
input 0
input 1
input 2
input 3
Run Code Online (Sandbox Code Playgroud)

该技术来自/sf/answers/117342991/