在Python中的进程之间共享许多队列

Wap*_*iti 12 python queue multiprocessing python-multiprocessing

我知道multiprocessing.Manager()它是如何用于创建共享对象的,特别是可以在worker之间共享的队列.有这个问题,这个问题,这个问题甚至是我自己的一个问题.

但是,我需要定义很多队列,每个队列都链接一对特定的进程.假设每对进程及其链接队列由变量标识key.

当我需要放置和获取数据时,我想使用字典来访问我的队列.我无法做到这一点.我尝试过很多东西.随着multiprocessing进口为mp:

for key in all_keys: DICT[key] = mp.Queue在多处理模块导入的配置文件中定义一个dict (调用它multi.py)不会返回错误,但是队列DICT[key]之间没有共享队列,每个队列似乎都有自己的队列副本,因此没有通信发生.

如果我尝试在定义DICT进程并启动它们的主多处理函数的开头定义,比如

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()
Run Code Online (Sandbox Code Playgroud)

我收到了错误

RuntimeError: Queue objects should only be shared between processes through
 inheritance
Run Code Online (Sandbox Code Playgroud)

改为

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()
Run Code Online (Sandbox Code Playgroud)

只会让事情变得更糟.multi.py在主函数的头部而不是在main函数内部尝试类似的定义会返回类似的错误.

必须有一种方法可以在进程之间共享许多队列,而无需在代码中明确命名每个队列.有任何想法吗?

编辑

这是该程序的基本架构:

1-加载第一个模块,它定义一些变量,导入multi,启动multi.main()和加载另一个模块,该模块启动一系列模块加载和代码执行.与此同时...

2- multi.main看起来像这样:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,) 
Run Code Online (Sandbox Code Playgroud)

而不是使用poolmanager,我也推出了以下流程:

mp.Process(target=targ1, args=(DICT[key],))
Run Code Online (Sandbox Code Playgroud)

3 - 该函数targ1接收key来自主进程的输入数据(按排序).其目的是通过结果DICT[key],以便targ2能够完成它的工作.这是不起作用的部分.有任意数量的targ1s,targ2s等,因此有任意数量的队列.

4 - 其中一些进程的结果将被发送到一堆不同的数组/ pandas数据帧,这些数据帧也被索引key,我希望可以从任意进程访问,甚至是在不同模块中启动的进程.我还没有写这部分,这可能是一个不同的问题.(我在这里提到它,因为上面3的答案也可能很好地解决了.)

skr*_*sme 22

当您尝试multiprocessing.Queue()通过将其作为参数传递来共享时,听起来您的问题已经开始.您可以通过创建托管队列来解决此问题:

import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
Run Code Online (Sandbox Code Playgroud)

当您使用管理器创建它时,您将存储并传递代理到队列,而不是队列本身,因此即使传递给您的工作进程的对象被复制,它仍将指向相同的底层数据结构:你的队列.它与C/C++中的指针非常相似(在概念上).如果以这种方式创建队列,则可以在启动工作进程时传递它们.

由于您现在可以传递队列,因此您不再需要管理字典.在main中保存一个普通的字典,它将存储所有映射,并且只为您的工作进程提供他们需要的队列,因此他们不需要访问任何映射.

我在这里写了一个例子.看起来你正在你的工人之间传递物品,所以这就是在这里做的.想象一下,我们有两个处理阶段,数据的开始和结束都在控制之中main.看看我们如何创建像管道一样连接工作者的队列,但他们所需的队列,他们就不需要知道任何映射:

import multiprocessing as mp

def stage1(q_in, q_out):

    q_out.put(q_in.get()+"Stage 1 did some work.\n")
    return

def stage2(q_in, q_out):

    q_out.put(q_in.get()+"Stage 2 did some work.\n")
    return

def main():

    pool = mp.Pool()
    manager = mp.Manager()

    # create managed queues
    q_main_to_s1 = manager.Queue()
    q_s1_to_s2 = manager.Queue()
    q_s2_to_main = manager.Queue()

    # launch workers, passing them the queues they need
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

    # Send a message into the pipeline
    q_main_to_s1.put("Main started the job.\n")

    # Wait for work to complete
    print(q_s2_to_main.get()+"Main finished the job.")

    pool.close()
    pool.join()

    return

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

代码生成此输出:

主要开始这项工作.
第1阶段做了一些工作.
第二阶段做了一些工作.
主要完成了这项工作.

我没有包含AsyncResults在字典中存储队列或对象的示例,因为我仍然不太了解您的程序应该如何工作.但是现在您可以自由地传递队列,您可以构建字典以根据需要存储队列/进程映射.

实际上,如果你确实在多个worker之间构建了一个管道,你甚至不需要保留对"inter-worker"队列的引用main.创建队列,将它们传递给您的worker,然后只保留对main将使用的队列的引用.如果你确实拥有"任意数量"的队列,我肯定会建议尽可能快地收集旧队列.