避免Python 3的多处理队列中的竞争条件

wks*_*rtz 12 python parallel-processing race-condition python-3.x

我试图找到大约61亿(自定义)项目的最大重量,我想用并行处理这样做.对于我的特定应用程序,有更好的算法,不需要我迭代超过61亿项,但解释它们的教科书是我的头脑,我的老板希望在4天内完成.我想我的公司的花哨的服务器和并行处理有更好的机会.但是,我所知道的关于并行处理的一切都来自于阅读Python 文档.这就是说我很丢失......

我目前的理论是设置一个馈送器进程,一个输入队列,一大堆(比如说30个)工作进程,以及一个输出队列(在输出队列中找到最大元素将是微不足道的).我不明白的是,馈线进程如何告诉工作进程何时停止等待项目通过输入队列.

我曾经考虑过使用multiprocessing.Pool.map_async我的6.1E9项目的迭代,但是只需要花费将近10分钟来迭代这些项目而不对它们做任何事情.除非我误解了某些东西......map_async迭代过程中将它们分配给流程可以在流程开始工作时完成.(Pool也提供imap但是文档说它类似于map,它似乎不是异步工作.我想要异步,对吗?)

相关问题:我想用concurrent.futures而不是multiprocessing吗?我不可能是第一个实施双排队系统的人(这正是美国每家熟食店的生产线如何工作......)那么有更多的Pythonic /内置方法吗?

这是我正在尝试做的一个框架.请参阅中间的注释块.

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)
Run Code Online (Sandbox Code Playgroud)


这是答案

我找到了一个非常彻底的回答我的问题,以及Python基金会通讯主管Doug Hellman对多任务处理的简单介绍.我想要的是"毒丸"模式.请在此处查看:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

转发@MRAB发布该概念的内核.

MRA*_*RAB 5

您可以将特殊的终止项目(例如“无”)放入队列中。当一个工人看到它时,可以将其放回给其他工人查看,然后终止。或者,您可以将每个工作人员的一个特殊终止项目放入队列。