在进程之间共享锁

Bry*_*yce 6 python python-3.x python-multiprocessing

我尝试遵循此解决方案以及此解决方案,但到目前为止仍未成功:

当我运行以下代码块时:

global manager
global lock
manager = Manager()
lock = manager.Lock()

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        with Pool(procs) as pool:
            pool.map(self.process_numberlist,
                  self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
            lock.acquire()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        self.running_total_list = self.running_total_list + temp
        logger.debug("New running_total_list length: "
                    + str(len(self.running_total_list)))
        lock.release()
        break
Run Code Online (Sandbox Code Playgroud)

我的日志中的输出如下所示:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9
Run Code Online (Sandbox Code Playgroud)

当我达到预期的输出时,我相信应该像这样:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15
Run Code Online (Sandbox Code Playgroud)

编辑-尝试2

请参阅根据亚伦的建议进行的更新。现在收到“只能加入可迭代”错误

global manager
global lock

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        manager = Manager()
        lock = manager.Lock()
        with Pool(procs) as pool:
            func = partial(self.process_numberlist, lock)
            pool.map(function,
              self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist, lock):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
             lock.acquire()
             self.running_total_list = self.running_total_list + temp_num_list
             logger.debug("New running_total_list length: "
                + str(len(self.running_total_list)))
             lock.release()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        break
Run Code Online (Sandbox Code Playgroud)

编辑#3-此玩具示例中未包含的getNewNumbers()仅返回整数数组。希望能有所帮助

nul*_*imp 0

在我看来,您的主要目标是访问共享资源(running_total_list),这就是为什么我特别关注这一点。

在你的例子中你使用了Pool,而我使用了Process。您可以查看这篇文章,了解两者之间的核心差异,并决定哪一个更适合您的用例。

我想出了这个关于如何在多个进程之间共享资源的快速示例。这应该可以让您了解如何从那里继续:

from multiprocessing import Process, Lock, Manager

def gen_numbers():
    import random
    return [i for i in range(random.randint(4,11))]

def process_numberlist(lock, shared_list, num):
    temp_num_list = gen_numbers()
    print("Proc %s: temp_num_list length: %s" %(num, len(temp_num_list)))

    try:
        lock.acquire()
        shared_list += temp_num_list
        print("Proc %s: New shared_list length: %s" %(num, len(shared_list)))
    finally:
        lock.release()

lock = Lock()
manager = Manager()
shared_list = manager.list()

proc = 5
proc_list = []

for num in range(proc):
    p = Process(target=process_numberlist, args=( lock, shared_list, num+1, ))
    p.start()

    proc_list.append( p )

for p in proc_list:
    p.join()
Run Code Online (Sandbox Code Playgroud)

需要注意的一件重要事情是此处 a 的定义shared_list。与线程不同,每个进程都有自己的内存空间(Pool也不例外),这就是为什么它们之间无法共享数据的原因。这意味着,您需要实现某种进程间通信(IPC),幸运的是 python 已经为您提供了一些工具。其中之一是multiprocessing.Manager。它公开了一些数据结构(例如dictlist),您可以使用它们在进程之间共享。

在这种情况下也是如此Lock。这很重要,因为您不想同时从多个进程访问共享内存。这只会让你的程序变得不可预测。

另一件需要注意的事情是,执行不一定按顺序process_numberlist发生,因为每个进程彼此独立运行,但它们都可以访问相同的资源。

我希望这有帮助!