使 Python 中的 C++ 函数超时

Bol*_*ain 6 c++ python signals

我已经实现了 python-cpp 绑定(使用 boost-python),以便foop()从 Python 调用运行 C++ 函数fooc()。我想从 Python 设置一个超时,以便在几秒钟foop后返回t这里的解决方案适用于 Python 函数,但不适用于foopb/c 我无法中断 C++ 代码 - 下面的示例用于调用run_for_timeout(foop). 有没有办法从 Python 执行此操作(即无需在 C++ 中实现计时器功能)?

import signal

class CallbackValueError(ValueError):
    """Raise for improper data values with callback functions and their utils."""
    pass

class TimeoutError(RuntimeError):
    pass

def run_for_timeout(func, args=(), kwargs=None, timeout=5):
    """Run a function until it times-out.

    Note that ``timeout`` = 0 does not imply no timeout, but rather there is
    no time for the function to run and this function raises an error

    Parameters
    ----------
    func : function
    args : tuple
    kwargs : dict | None
    timeout : int
        (seconds)

    Returns
    -------
    result : object | None
        Return object from function, or None if it timed out

    Raises
    ------
    CallbackValueError

    """
    if timeout <= 0:
        raise CallbackValueError("{}s is a nonsensical value for the "
                                 "timeout function".format(timeout))

    def handler(signum, frame):
        raise TimeoutError()

    # Set the timeout handler
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout)

    if kwargs is None:
        kwargs = {}

    try:
        result = func(*args, **kwargs)
    except TimeoutError as e:
        result = None
    finally:
        # Function returned before timeout, so cancel the timer
        signal.alarm(0)

    return result
Run Code Online (Sandbox Code Playgroud)

小智 0

由于 C++ 函数 fooc() 通过 Python 的 foop() 运行,并且 C++ 代码本身无法从 Python 直接访问,因此使用 Python 的信号处理机制直接中断 C++ 代码具有挑战性。

import multiprocessing
import time

# C++ function wrapped by foop()
def fooc():
    time.sleep(10)  # Simulate a long-running C++ computation (10 seconds)
    return "Done"

# Wrapper function to run fooc() with a timeout
def foop_with_timeout(timeout=5):
    result_queue = multiprocessing.Queue()

    def fooc_wrapper(queue):
        try:
            result = fooc()
            queue.put(result)
        except Exception as e:
            queue.put(e)

    process = multiprocessing.Process(target=fooc_wrapper, args=(result_queue,))
    process.start()
    process.join(timeout)

    if process.is_alive():
        process.terminate()
        process.join()
        return None  # Return None if the timeout was reached

    result = result_queue.get()
    if isinstance(result, Exception):
        raise result
    return result

if __name__ == "__main__":
    try:
        result = foop_with_timeout(5)
        if result is None:
            print("Function timed out.")
        else:
            print("Result:", result)
    except Exception as e:
        print("An error occurred:", e)
Run Code Online (Sandbox Code Playgroud)