一段时间后打破功能

use*_*074 18 python subprocess

在python中,对于玩具示例:

for x in range(0, 3):
    # call function A(x)
Run Code Online (Sandbox Code Playgroud)

我希望继续for循环,如果函数A跳过它需要超过5秒,这样我就不会卡住或浪费时间.

通过做一些搜索,我意识到子进程或线程可能会有所帮助,但我不知道如何在这里实现.任何帮助都会很棒.谢谢

The*_*nse 29

我认为创建一个新流程可能有点过头了.如果您使用的是Mac或基于Unix的系统,则应该能够使用signal.SIGALRM强制超时耗时的功能.这将适用于因网络闲置的功能或您通过修改功能无法处理的其他问题.我有一个在这个答案中使用它的例子:

/sf/answers/1744523441/

在这里编辑我的答案,虽然我不确定我应该这样做:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it's sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)
Run Code Online (Sandbox Code Playgroud)

这基本上设置了5秒的计时器,然后尝试执行您的代码.如果在时间用完之前无法完成,则会发送一个SIGALRM,我们将其捕获并转换为TimeoutException.这会强制你进入except块,你的程序可以继续.

编辑:呐喊,TimeoutException是一个阶级,而不是一个功能.谢谢,abarnert!

  • 注意:信号不支持多线程。它仅适用于主线程。 (3认同)
  • @TheSoundDefense:是的,我想.谁没有犯过那种愚蠢的错误?在没有至少6个小时的抨击显示器的情况下,谁在他自己的代码中注意到了这一点? (2认同)

aba*_*ert 8

如果你可以打破你的工作并经常检查,那几乎总是最好的解决方案.但有时候这是不可能的 - 例如,也许你正在从一个慢文件共享中读取一个文件,每隔一段时间就会挂起30秒.要在内部处理它,您必须围绕异步I/O循环重构整个程序.

如果您不需要跨平台,则可以使用*nix(包括Mac和Linux)上的信号,Windows上的APC等.但如果您需要跨平台,则无效.

所以,如果你真的需要同时做,你可以,有时你必须这样做.在这种情况下,您可能希望为此使用一个进程,而不是一个线程.你无法安全地杀死一个线程,但你可以杀死一个进程,它可以像你想要的那样安全.此外,如果线程花了5秒以上因为它受CPU限制,那么你不想在GIL上与它战斗.

这里有两个基本选项.


首先,您可以将代码放在另一个脚本中并使用以下命令运行subprocess:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)
Run Code Online (Sandbox Code Playgroud)

由于这是通过正常的子进程通道,你可以使用的唯一通信是一些argv字符串,一个成功/失败返回值(实际上是一个小整数,但这并不是更好),并且可选择一大块文本进入和大块的文字出来.


或者,您可以使用multiprocessing生成类似线程的子进程:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,这有点复杂,但在某些方面它更好:

  • 您可以传递任意Python对象(至少可以被腌制的任何东西),而不仅仅是字符串.
  • 您可以将其作为一个函数保留在同一个脚本中,而不必将目标代码放在完全独立的脚本中.
  • 它更灵活 - 例如,如果您以后需要通过进度更新,则可以很容易地在任一方向或两个方向添加队列.

任何类型的并行性的一个大问题是共享可变数据 - 例如,让后台任务更新全局字典作为其工作的一部分(您的评论称您正在尝试这样做).使用线程,您可以放弃它,但竞争条件可能导致数据损坏,因此您必须非常小心锁定.有了子进程,你根本无法逃脱它.(是的,您可以使用共享内存,因为进程之间的共享状态说明了,但这仅限于简单类型,如数字,固定数组和您知道如何定义为C结构的类型,它只会让您回到相同的问题作为线程.)


理想情况下,您可以安排一些事情,这样您就不需要在进程运行时共享任何数据 - 您将dict一个参数作为参数传递并dict返回结果.当你想要在后台放置一个先前同步的功能时,这通常很容易安排.

但是,如果部分结果优于没有结果呢?在这种情况下,最简单的解决方案是通过队列传递结果.您可以使用显式队列执行此操作,如在进程之间交换对象中所述,但有一种更简单的方法.

如果您可以将整体流程分解为单独的任务,每个值(或一组值)都要包含在字典中,您可以将它们安排在Pool-or,甚至更好的a上concurrent.futures.Executor.(如果您使用的是Python 2.x或3.1,请参阅futuresPyPI上的backport .)

假设您的慢函数看起来像这样:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count
Run Code Online (Sandbox Code Playgroud)

相反,你会这样做:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count
Run Code Online (Sandbox Code Playgroud)

你在5秒内获得的结果会被添加到dict中; 如果不是全部,那么其余的都被放弃了,并且a TimeoutError被引发(你可以随意处理 - 记录它,做一些快速回退代码,无论如何).

如果任务真的是独立的(因为它们在我的愚蠢的小例子中,但当然它们可能不在您的真实代码中,至少没有重大的重新设计),您可以通过删除它来免费并行化工作max_workers=1.然后,如果你在一台8核机器上运行它,它将启动8名工作人员并给他们每个工作的1/8,事情将更快地完成.(通常不是快8倍,但通常快3-6倍,这仍然相当不错.)


Igo*_*min 8

也许有人发现这个装饰器很有用,基于 TheSoundDefense 的回答:

import time
import signal

class TimeoutException(Exception):   # Custom exception class
    pass


def break_after(seconds=2):
    def timeout_handler(signum, frame):   # Custom signal handler
        raise TimeoutException
    def function(function):
        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(seconds)
            try:
                res = function(*args, **kwargs)
                signal.alarm(0)      # Clear alarm
                return res
            except TimeoutException:
                print u'Oops, timeout: %s sec reached.' % seconds, function.__name__, args, kwargs
            return
        return wrapper
    return function
Run Code Online (Sandbox Code Playgroud)

测试:

@break_after(3)
def test(a, b, c):
    return time.sleep(10)

>>> test(1,2,3)
Oops, timeout: 3 sec reached. test (1, 2, 3) {}
Run Code Online (Sandbox Code Playgroud)


Zek*_*oid 6

评论是正确的,你应该在里面检查.这是一个潜在的解决方案.请注意,异步函数(例如,通过使用线程)与此解决方案不同.这是同步的,这意味着它仍然会串行运行.

import time

for x in range(0,3):
    someFunction()

def someFunction():
    start = time.time()
    while (time.time() - start < 5):
        # do your normal function

    return;
Run Code Online (Sandbox Code Playgroud)

  • 不,它不起作用,如果A()永远占用,循环永远不会超时. (4认同)
  • 我的正常功能永远持续。这个解决方案对我来说真的不起作用。 (2认同)