在流程之间分享一个令人兴奋的词典

bux*_*bux 7 python multiprocessing

问题陈述

我面临多处理问题.多处理堆栈溢出问题的很大一部分没有我的情况复杂,也没有回答它.有些人投票说这个问题可能重复,但我的情况有所不同,在我的情况下,共享DICT在进程之间被修改了:

我有一个程序遵循这个简化的生命周期:

A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C
Run Code Online (Sandbox Code Playgroud)

性能是问题的一个非常重要的方面.我尝试了许多正面和负面的解决方案:

简单的全球字典(不工作)

在步骤中B,DICT变量被分叉到子流程环境中.但经过一步E子过程无法看到变化.

使用multiprocessing.Manager dict

在步骤Adict创建时multiprocessing.Manager(请参阅此处的 "服务器进程" ).

  • 优点:易于使用
  • 缺点:multiprocessing.Manager使用序列化层(我不太了解它,但它能够与网络上的进程一起工作),这对性能有害.

使用多个multiprocessing.Value和multiprocessing.Array而不是dict

multiprocessing.Valuemultiprocessing.Array允许使用共享内存.我尝试用几个替换我的dict multiprocessing.Value并且multiprocessing.Array像这样:

用dict:

manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792
Run Code Online (Sandbox Code Playgroud)

替换为字典multiprocessing.Valuemultiprocessing.Array:

positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)
Run Code Online (Sandbox Code Playgroud)

但在步骤E我将需要创建新的multiprocessing.Valuemultiprocessing.Array,例如:

positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)
Run Code Online (Sandbox Code Playgroud)

然后在步骤C,on_position_322工人将是未知的.如果我尝试通过管道发送multiprocessing.Valuemultiprocessing.Array子进程,将导致"同步对象应该只通过继承在进程之间共享"错误.

  • 优点:表现
  • 缺点:如何"通知"子流程有关新的存在multiprocessing.Valuemultiprocessing.Array

使用内存数据库,如memcache或redis

我知道这是一种可能性,但我必须对内存数据库与multiprocessing.Managerdict进行对比.

  • 优点:务实和工作
  • 缺点:表现?

 问题结论

是否存在使用multiprocessing.Valuemultiprocessing.Array生命周期的方式,考虑创建新的multiprocessing.Valuemultiprocessing.Array

或者更一般地说,考虑到这个生命周期,最有效的策略是什么?

注意:我之前尝试过另一种策略,其中步骤F是"转到B"(在每个周期重新创建新工人).但工人的分岔环境太长了:最大的DICT是叉子.

小智 1

由于您只是在主进程中读取字典并更新它,因此您可以使用 JoinableQueue 来传递字典并等待工作人员完成。例如

from multiprocessing import Process, JoinableQueue
import time

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        for item in iter(self.queue.get, None):
            print item
            time.sleep(2)
            print 'done'
            self.queue.task_done()
        self.queue.task_done()

if __name__ == '__main__':
    request_queue = JoinableQueue()
    num_workers = 4
    workers = []
    d = {}  # A

    for _ in range(num_workers): 
        p = Worker(request_queue) # B
        workers.append(p)
        p.start()


    for i in range(5): # F
        for _ in range(num_workers):
            request_queue.put(d) # C
        request_queue.join()  # D
        d[i] = i  # E

    for w in workers:
        w.terminate()
        w.join()
Run Code Online (Sandbox Code Playgroud)

输出:

{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done
Run Code Online (Sandbox Code Playgroud)