相关疑难解决方法(0)

Python 3:在多处理过程中捕获警告

太长; 没看过

warnings.catch_warnings()上下文管理是不是线程安全的.如何在并行处理环境中使用它?

背景

下面的代码使用Python multiprocessing模块的并行处理解决了最大化问题.它需要一个(不可变的)小部件列表,将它们分区(参见Python 3中大规模,强力最大化的高效多处理),找到所有分区的最大值("最终者"),然后找到最大值("冠军")那些"入围者".如果我正确地理解了我自己的代码(如果我这样做,我就不会在这里),我正在与所有子进程共享内存以给它们输入小部件,并multiprocessing使用操作系统级管道和pickle来发送当工人完成时,决赛小工具回到主要过程.

问题的根源

我希望捕获在窗口小部件从进程间管道出来时发生的取消对象之后由窗口小部件重新实例化引起的冗余窗口小部件警告.当窗口小部件对象实例化时,它们会验证自己的数据,从Python标准warnings模块发出警告,告诉应用程序的用户该窗口小部件怀疑用户的输入数据存在问题.因为unpickling导致对象实例化,所以我对代码的理解意味着每个widget对象只重新实例化一次,当且仅当它从管道出来后才是决赛者 - 请参阅下一节以了解为什么这不正确.

这些小部件在被擦除之前就已经创建了,因此用户已经痛苦地意识到他输入了什么输入并且不想再听到它.这些是我想要通过warnings模块的catch_warnings()上下文管理器捕获的警告(即with声明).

解决方案失败

在我的测试中,当多余的警告被发射到我在下面标记为A 线B 线之间的任何地方时,我已经缩小了范围.让我感到惊讶的是,警告是在不仅仅是附近的地方发出的output_queue.get().这意味着我multiprocessing使用酸洗将小部件发送给工人.

结果是,warnings.catch_warnings()即使在从A行B 行的所有内容中创建上下文管理器,并在此上下文中设置正确的警告过滤器也不会捕获警告.这意味着警告正在工作进程中发出.将此上下文管理器放在工作器代码周围也不会捕获警告.

代码

这个例子省略了决定,如果问题的规模太小,与派生工艺,进口多,并确定打扰的代码my_frobnal_counter,和my_load_balancer.

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = …
Run Code Online (Sandbox Code Playgroud)

python warnings multiprocessing python-3.x

34
推荐指数
1
解决办法
1599
查看次数

避免Python 3的多处理队列中的竞争条件

我试图找到大约61亿(自定义)项目的最大重量,我想用并行处理这样做.对于我的特定应用程序,有更好的算法,不需要我迭代超过61亿项,但解释它们的教科书是我的头脑,我的老板希望在4天内完成.我想我的公司的花哨的服务器和并行处理有更好的机会.但是,我所知道的关于并行处理的一切都来自于阅读Python 文档.这就是说我很丢失......

我目前的理论是设置一个馈送器进程,一个输入队列,一大堆(比如说30个)工作进程,以及一个输出队列(在输出队列中找到最大元素将是微不足道的).我不明白的是,馈线进程如何告诉工作进程何时停止等待项目通过输入队列.

我曾经考虑过使用multiprocessing.Pool.map_async我的6.1E9项目的迭代,但是只需要花费将近10分钟来迭代这些项目而不对它们做任何事情.除非我误解了某些东西......map_async迭代过程中将它们分配给流程可以在流程开始工作时完成.(Pool也提供imap但是文档说它类似于map,它似乎不是异步工作.我想要异步,对吗?)

相关问题:我想用concurrent.futures而不是multiprocessing吗?我不可能是第一个实施双排队系统的人(这正是美国每家熟食店的生产线如何工作......)那么有更多的Pythonic /内置方法吗?

这是我正在尝试做的一个框架.请参阅中间的注释块.

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try: …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing race-condition python-3.x

12
推荐指数
1
解决办法
2599
查看次数