具有更新队列和输出队列的Python多处理

Jaq*_*aqo 5 python multiprocessing

如何编写使用两个队列的Python多进程脚本?

  1. 一个作为工作队列,从一些数据开始,然后根据要并行化的功能的条件,即时接收其他任务,
  2. 另一个收集结果并用于在处理完成后记下结果。

我基本上需要根据我在初始项目中发现的内容,在工作队列中添加更多任务。我在下面发布的示例很愚蠢(我可以随意更改项目并将其直接放在输出Queue中),但是它的机制很明确,反映了我需要开发的概念的一部分。

在此,我尝试:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.
Run Code Online (Sandbox Code Playgroud)

这不会结束也不会显示任何结果。

在整个过程结束时,我想确保工作队列为空,并确保所有并行函数在迭代后面的输出以取出结果之前都已完成对输出队列的写入。您对如何使其工作有建议吗?

Jaq*_*aqo 5

下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。

此代码允许在一个进程中使用多个内核,该进程要求在处理过程中向工作线程提供数据的队列可以由它们更新:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank
Run Code Online (Sandbox Code Playgroud)


taw*_*mas 3

您在创建进程的行中有一个拼写错误。应该是mp.Process,不是mp.process。这就是导致您收到异常的原因。

另外,您没有在工作人员中循环,因此他们实际上只消耗队列中的每个项目,然后退出。如果不了解更多所需的逻辑,就很难给出具体的建议,但您可能希望将函数的主体包含workerwhile True循环内,并在主体中添加一个条件,以在工作完成时退出。

请注意,如果您不添加条件来显式退出循环,那么当队列为空时,您的工作人员将永远停止。您可以考虑使用所谓的毒丸技术来向工人发出信号,让他们可以退出。您将在有关进程间通信的PyMOTW 文章中找到示例和一些有用的讨论。

至于要使用的进程数量,您需要进行一些基准测试才能找到适合您的进程数量,但是,一般来说,当您的工作负载受 CPU 限制时,每个核心一个进程是一个很好的起点。如果您的工作负载受 IO 限制,则使用更多数量的工作人员可能会获得更好的结果。