在python多处理中修改对象

Dil*_*rix 14 python parallel-processing multiprocessing

我有一大堆自定义对象,我需要执行独立(可并行化)的任务,包括修改对象参数.我尝试过使用Manager().dict和'sharedmem'ory,但两者都没有用.例如:

import numpy as np
import multiprocessing as mp
import sharedmem as shm


class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn):
    test.num = np.random.randn()
    test.name = nn


if __name__ == '__main__':

    num = 10

    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)

    sh_tests = shm.empty(num, dtype=object)
    for it in range(num):
        sh_tests[it] = tests[it]
        print sh_tests[it]

    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some') ) for test in sh_tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    for test in sh_tests: print test
Run Code Online (Sandbox Code Playgroud)

打印出来:

0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none


0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none
Run Code Online (Sandbox Code Playgroud)

即对象未被修改.

我怎样才能达到理想的行为?

tac*_*ell 14

问题在于,当对象被传递给工作进程时,它们被打包并运送到另一个进程,在那里它们被解压缩并进行处理.您的对象不会像克隆那样传递给其他进程.您不返回对象,因此克隆的对象被愉快地修改,然后被丢弃.

看起来这是不可能的(Python:可以直接在两个独立的进程之间共享内存数据).

你可以做的是返回修改过的对象.

import numpy as np
import multiprocessing as mp



class Tester:

    num = 0.0
    name = 'none'
    def __init__(self,tnum=num, tname=name):
        self.num  = tnum
        self.name = tname

    def __str__(self):
        return '%f %s' % (self.num, self.name)

def mod(test, nn, out_queue):
    print test.num
    test.num = np.random.randn()
    print test.num
    test.name = nn
    out_queue.put(test)




if __name__ == '__main__':       
    num = 10
    out_queue = mp.Queue()
    tests = np.empty(num, dtype=object)
    for it in range(num):
        tests[it] = Tester(tnum=it*1.0)


    print '\n'
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue) ) for test in tests ]

    for work in workers: work.start()

    for work in workers: work.join()

    res_lst = []
    for j in range(len(workers)):
        res_lst.append(out_queue.get())

    for test in res_lst: print test
Run Code Online (Sandbox Code Playgroud)

这确实导致了有趣的观察结果,因为产生的过程是相同的,它们都以随机数的相同种子开始,因此它们生成相同的"随机"数字.


jfs*_*jfs 5

您的代码不会尝试修改共享内存。它只是克隆单个对象。

dtype=object意味着由于@tcaswell 提供的链接中sharedmem概述的原因而无法工作:

共享包含对其他对象的引用/指针的对象图基本上是不可行的

对于普通(值)类型,您可以使用共享内存,请参阅在共享内存中使用 numpy 数组进行多处理

manager方法也应该有效(它只是复制周围的对象):

import random
from multiprocessing import Pool, Manager

class Tester(object):
    def __init__(self, num=0.0, name='none'):
        self.num  = num
        self.name = name

    def __repr__(self):
        return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy
    t.name = nn
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([Tester(num=i) for i in range(num)])
    print(tests[:2])

    args = ((i, t, 'some') for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)