Python动态多处理和信令问题

jbn*_*dlr 8 python signals multiprocessing python-2.7

我有一个python multiprocessing设置(即工作进程)与自定义信号处理,这可以防止工人干净利用multiprocessing自己.(参见下面的扩展问题描述).

安装程序

生成所有工作进程的类如下所示(某些部分被剥离为仅包含重要部分).

在这里,它只重新绑定自己signal的印刷品Master teardown; 实际上,接收到的信号沿着进程树传播,必须由工作人员自己处理.这是通过在产生工人之后重新绑定信号来实现的.

class Midlayer(object):
    def __init__(self, nprocs=2):
        self.nprocs = nprocs
        self.procs = []

    def handle_signal(self, signum, frame):
        log.info('Master teardown')
        for p in self.procs:
            p.join()
        sys.exit()

    def start(self):
        # Start desired number of workers
        for _ in range(nprocs):
            p = Worker()
            self.procs.append(p)
            p.start()

        # Bind signals for master AFTER workers have been spawned and started
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        # Serve forever, only exit on signals
        for p in self.procs:
            p.join()
Run Code Online (Sandbox Code Playgroud)

工人阶级基础multiprocessing.Process和实现自己的run()-方法.

在此方法中,它连接到分布式消息队列并永久轮询队列中的项目.永远应该是:直到工人收到SIGINTSIGTERM.工人不应该立即戒烟; 相反,它必须完成它所做的任何计算,然后将退出(一旦quit_req设置为True).

class Worker(Process):
    def __init__(self):
        self.quit_req = False
        Process.__init__(self)

    def handle_signal(self, signum, frame):
        print('Stopping worker (pid: {})'.format(self.pid))
        self.quit_req = True

    def run(self):
        # Set signals for worker process
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        q = connect_to_some_distributed_message_queue()

        # Start consuming
        print('Starting worker (pid: {})'.format(self.pid))
        while not self.quit_req:
            message = q.poll()
            if len(message):
                try:
                    print('{} handling message "{}"'.format(
                        self.pid, message)
                    )
                    # Facade pattern: Pick the correct target function for the
                    # requested message and execute it.
                    MessageRouter.route(message)
                except Exception as e:
                    print('{} failed handling "{}": {}'.format(
                        self.pid, message, e.message)
                    )
Run Code Online (Sandbox Code Playgroud)

问题

到目前为止基本设置,(几乎)一切正常:

  • 主进程产生所需数量的工作程序
  • 每个worker都连接到消息队列
  • 发布消息后,其中一个工作人员会收到该消息
  • Facade模式(使用名为MessageRouter的类)将接收到的消息路由到相应的函数并执行它

现在出现问题:目标函数(messageMessageRouter外观指向的内容)可能包含非常复杂的业务逻辑,因此可能需要多处理.

例如,如果目标函数包含这样的内容:

nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)
Run Code Online (Sandbox Code Playgroud)

然后由衍生的过程Pool也将重定向他们的信号处理为SIGINTSIGTERM对工人的handle_signal(信号传播的过程子树因为)功能,本质上打印Stopping worker (pid: ...),而不是停在所有.我知道,这是因为我在生成子进程之前重新绑定了工作者的信号.

这就是我被困住的地方:我不能产生子进程设置工人的信号,因为我不知道它是否会产生一些(目标函数被掩盖并可能由其他人编写),并且因为工作者在其轮询循环中保持(按设计).同时,我不能指望用于multiprocessing将其自己的信号处理程序重新绑定到(无论如何)默认值的目标函数的实现.

目前,我想恢复工作程序中每个循环中的信号处理程序(在将消息路由到其目标函数之前)并在函数返回后重置它们是唯一的选择,但它只是感觉不对.

我错过了什么吗?你有什么建议吗?如果有人能给我一个如何解决我的设计缺陷的暗示,我会很高兴的!

nox*_*fox 8

没有明确的方法以您想要的方式解决问题。我经常发现自己必须在多处理环境中运行未知代码(表示为 Python 入口点函数,可能会陷入某些 C 怪癖)。

这就是我处理问题的方式。

主循环

通常主循环非常简单,它从某个源(HTTP、Pipe、Rabbit Queue..)获取一个任务并将其提交给一个工作池。我确保正确处理 KeyboardInterrupt 异常以关闭服务。

try:
    while 1:
        task = get_next_task()
        service.process(task)
except KeyboardInterrupt:
    service.wait_for_pending_tasks()
    logging.info("Sayonara!")
Run Code Online (Sandbox Code Playgroud)

工人

工作人员由来自multiprocessing.Pool或的工作人员池管理concurrent.futures.ProcessPoolExecutor。如果我需要更高级的功能,例如超时支持,我要么使用billiardpebble

每个工作人员将按照此处的建议忽略 SIGINT 。SIGTERM 保留为默认值。

服务

该服务由 systemd 或supervisord 控制。在任何一种情况下,我都会确保终止请求始终作为 SIGINT (CTL+C) 传递。

我想将 SIGTERM 保留为紧急关闭,而不是仅依赖 SIGKILL。SIGKILL 不可移植,一些平台没有实现它。

“我希望就这么简单”

如果事情更复杂,我会考虑使用LuigiCelery等框架。

一般来说,在这些事情上重新发明轮子是非常有害的,而且几乎没有什么满足感。特别是如果其他人必须查看该代码。

当然,如果您的目标是学习这些事情是如何完成的,则后一句不适用。


dap*_*azz 5

我能够使用 Python 3 和风味set_start_method(method)来做到这一点。Python 3 > Python 2 的另一种方式!'forkserver'

我所说的“这个”是指:

  1. 有一个带有自己的信号处理程序的主进程,该处理程序只是加入子进程。
  2. 有一些带有信号处理程序的工作进程可能会产生......
  3. 其他没有信号处理程序的子进程。

Ctrl-C 上的行为是:

  1. 管理器进程等待工作器退出。
  2. 工作人员运行他们的信号处理程序(可能会设置一个stop标志并继续执行以完成他们的工作,尽管我在示例中没有打扰,我只是加入了我知道我拥有的孩子),然后退出。
  3. 所有工人的孩子都会立即死亡。

当然请注意,如果您的目的是让工作人员的子代不崩溃,您将需要在工作人员进程方法run()或其他地方为他们安装一些忽略处理程序或其他东西。

无情地从文档中删除:

当程序启动并选择forkserver启动方式时,就会启动一个服务器进程。从那时起,每当需要新进程时,父进程就会连接到服务器并请求它派生一个新进程。fork 服务器进程是单线程的,因此使用 os.fork() 是安全的。不会继承不必要的资源。

可用于支持通过 Unix 管道传递文件描述符的 Unix 平台。

因此,这个想法是“服务器进程”在安装新进程之前继承默认的信号处理行为,因此它的所有子进程也具有默认处理。

代码的全部荣耀:

from multiprocessing import Process, set_start_method
import sys
from signal import signal, SIGINT
from time import sleep


class NormalWorker(Process):

    def run(self):
        while True:
            print('%d %s work' % (self.pid, type(self).__name__))
            sleep(1)


class SpawningWorker(Process):

    def handle_signal(self, signum, frame):
        print('%d %s handling signal %r' % (
            self.pid, type(self).__name__, signum))

    def run(self):

        signal(SIGINT, self.handle_signal)
        sub = NormalWorker()
        sub.start()
        print('%d joining %d' % (self.pid, sub.pid))
        sub.join()
        print('%d %s joined sub worker' % (self.pid, type(self).__name__))


def main():
    set_start_method('forkserver')

    processes = [SpawningWorker() for ii in range(5)]

    for pp in processes:
        pp.start()

    def sig_handler(signum, frame):
        print('main handling signal %d' % signum)
        for pp in processes:
            pp.join()
        print('main out')
        sys.exit()

    signal(SIGINT, sig_handler)

    while True:
        sleep(1.0)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)