在 Python 中 fork 时父子进程之间的通信

doe*_*toe 5 python fork ipc

我试图让一个 Python 程序同时运行一个处理循环和一个结果广播服务,使用对 os.fork() 的调用,类似于

pid = os.fork()
if pid == 0:
    time.sleep(3)
    keep_updating_some_value_while_parent_is_running()
else:
    broadcast_value()
Run Code Online (Sandbox Code Playgroud)

这里keep_updating_some_value_while_parent_is_running(),由子进程执行,存储一些值,只要父进程在运行,它就会不断更新。它实际上将值写入磁盘,以便父级可以轻松访问它。它通过检查它运行的 Web 服务是否可用来检测父进程是否正在运行。

broadcast_value() 运行一个 web 服务,当被查询时,它从磁盘读取最新的值并提供它。

这种实现效果很好,但由于以下几个原因并不令人满意:

  1. time.sleep(3)是必要的,因为 Web 服务需要一些启动时间。根本无法保证服务会在 3 秒内启动并运行,而另一方面,它可能会更早。

  2. 通过磁盘共享数据并不总是一个好的选择,甚至不可能(所以这个解决方案不能很好地概括)。

  3. 通过检查 Web 服务是否可用来检测父进程是否正在运行并不是很理想,而且对于不同类型的进程(不能那么容易地自动轮询),这根本不起作用。此外,可能是 Web 服务运行良好,但存在临时可用性问题。

  4. 解决方案取决于操作系统。

  5. 当子进程由于某种原因失败或退出时,父进程将继续运行(这可能是所需的行为,但并非总是如此)。

我想要的是让子进程知道父进程何时启动和运行以及何时停止,以及父进程根据请求获取子进程计算的最新值的某种方式,最好是在独立于操作系统的情况下道路。也欢迎涉及非标准库的解决方案。

Nat*_*eks 3

我建议使用multiprocessing而不是os.fork(),因为它可以为您处理很多细节。特别是它提供了该类Manager,该类提供了一种在进程之间共享数据的好方法。您将启动一个Process来处理获取数据,另一个来进行 Web 服务,并向它们传递一个由Manager. 然后,主进程只负责设置所有这些(并等待进程完成 - 否则会Manager中断)。

这可能是这样的:

import time
from multiprocessing import Manager, Process

def get_data():
    """ Does the actual work of getting the updating value. """

def update_the_data(shared_dict):
    while not shared_dict.get('server_started'):
        time.sleep(.1)
    while True:
        shared_dict['data'] = get_data()
        shared_dict['data_timestamp'] = time.time()
        time.sleep(LOOP_DELAY)


def serve_the_data(shared_dict):
    server = initialize_server() # whatever this looks like
    shared_dict['server_started'] = True
    while True:
        server.serve_with_timeout()
        if time.time() - shared_dict['data_timestamp'] > 30:
            # child hasn't updated data for 30 seconds; problem?
            handle_child_problem()


if __name__ == '__main__':
    manager = Manager()
    shared_dict = manager.dict()
    processes = [Process(target=update_the_data, args=(shared_dict,)),
        Process(target=serve_the_data, args=(shared_dict,))]
    for process in processes:
        process.start()
    for process in processes:
        process.join()
Run Code Online (Sandbox Code Playgroud)