jbn*_*dlr 8 python signals multiprocessing python-2.7
我有一个python multiprocessing设置(即工作进程)与自定义信号处理,这可以防止工人干净利用multiprocessing自己.(参见下面的扩展问题描述).
生成所有工作进程的主类如下所示(某些部分被剥离为仅包含重要部分).
在这里,它只重新绑定自己signal的印刷品Master teardown; 实际上,接收到的信号沿着进程树传播,必须由工作人员自己处理.这是通过在产生工人之后重新绑定信号来实现的.
class Midlayer(object):
def __init__(self, nprocs=2):
self.nprocs = nprocs
self.procs = []
def handle_signal(self, signum, frame):
log.info('Master teardown')
for p in self.procs:
p.join()
sys.exit()
def start(self):
# Start desired number of workers
for _ in range(nprocs):
p = Worker()
self.procs.append(p)
p.start()
# Bind signals for master AFTER workers have been spawned and started
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)
# Serve forever, only exit on signals
for p in self.procs:
p.join()
Run Code Online (Sandbox Code Playgroud)
该工人阶级基础multiprocessing.Process和实现自己的run()-方法.
在此方法中,它连接到分布式消息队列并永久轮询队列中的项目.永远应该是:直到工人收到SIGINT或SIGTERM.工人不应该立即戒烟; 相反,它必须完成它所做的任何计算,然后将退出(一旦quit_req设置为True).
class Worker(Process):
def __init__(self):
self.quit_req = False
Process.__init__(self)
def handle_signal(self, signum, frame):
print('Stopping worker (pid: {})'.format(self.pid))
self.quit_req = True
def run(self):
# Set signals for worker process
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)
q = connect_to_some_distributed_message_queue()
# Start consuming
print('Starting worker (pid: {})'.format(self.pid))
while not self.quit_req:
message = q.poll()
if len(message):
try:
print('{} handling message "{}"'.format(
self.pid, message)
)
# Facade pattern: Pick the correct target function for the
# requested message and execute it.
MessageRouter.route(message)
except Exception as e:
print('{} failed handling "{}": {}'.format(
self.pid, message, e.message)
)
Run Code Online (Sandbox Code Playgroud)
到目前为止基本设置,(几乎)一切正常:
现在出现问题:目标函数(message由MessageRouter外观指向的内容)可能包含非常复杂的业务逻辑,因此可能需要多处理.
例如,如果目标函数包含这样的内容:
nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)
Run Code Online (Sandbox Code Playgroud)
然后由衍生的过程Pool也将重定向他们的信号处理为SIGINT和SIGTERM对工人的handle_signal(信号传播的过程子树因为)功能,本质上打印Stopping worker (pid: ...),而不是停在所有.我知道,这是因为我在生成子进程之前重新绑定了工作者的信号.
这就是我被困住的地方:我不能在产生子进程后设置工人的信号,因为我不知道它是否会产生一些(目标函数被掩盖并可能由其他人编写),并且因为工作者在其轮询循环中保持(按设计).同时,我不能指望用于multiprocessing将其自己的信号处理程序重新绑定到(无论如何)默认值的目标函数的实现.
目前,我想恢复工作程序中每个循环中的信号处理程序(在将消息路由到其目标函数之前)并在函数返回后重置它们是唯一的选择,但它只是感觉不对.
我错过了什么吗?你有什么建议吗?如果有人能给我一个如何解决我的设计缺陷的暗示,我会很高兴的!
没有明确的方法以您想要的方式解决问题。我经常发现自己必须在多处理环境中运行未知代码(表示为 Python 入口点函数,可能会陷入某些 C 怪癖)。
这就是我处理问题的方式。
主循环
通常主循环非常简单,它从某个源(HTTP、Pipe、Rabbit Queue..)获取一个任务并将其提交给一个工作池。我确保正确处理 KeyboardInterrupt 异常以关闭服务。
try:
while 1:
task = get_next_task()
service.process(task)
except KeyboardInterrupt:
service.wait_for_pending_tasks()
logging.info("Sayonara!")
Run Code Online (Sandbox Code Playgroud)
工人
工作人员由来自multiprocessing.Pool或的工作人员池管理concurrent.futures.ProcessPoolExecutor。如果我需要更高级的功能,例如超时支持,我要么使用billiard或pebble。
每个工作人员将按照此处的建议忽略 SIGINT 。SIGTERM 保留为默认值。
服务
该服务由 systemd 或supervisord 控制。在任何一种情况下,我都会确保终止请求始终作为 SIGINT (CTL+C) 传递。
我想将 SIGTERM 保留为紧急关闭,而不是仅依赖 SIGKILL。SIGKILL 不可移植,一些平台没有实现它。
“我希望就这么简单”
如果事情更复杂,我会考虑使用Luigi或Celery等框架。
一般来说,在这些事情上重新发明轮子是非常有害的,而且几乎没有什么满足感。特别是如果其他人必须查看该代码。
当然,如果您的目标是学习这些事情是如何完成的,则后一句不适用。
我能够使用 Python 3 和风味set_start_method(method)来做到这一点。Python 3 > Python 2 的另一种方式!'forkserver'
我所说的“这个”是指:
Ctrl-C 上的行为是:
stop标志并继续执行以完成他们的工作,尽管我在示例中没有打扰,我只是加入了我知道我拥有的孩子),然后退出。当然请注意,如果您的目的是让工作人员的子代不崩溃,您将需要在工作人员进程方法run()或其他地方为他们安装一些忽略处理程序或其他东西。
无情地从文档中删除:
当程序启动并选择forkserver启动方式时,就会启动一个服务器进程。从那时起,每当需要新进程时,父进程就会连接到服务器并请求它派生一个新进程。fork 服务器进程是单线程的,因此使用 os.fork() 是安全的。不会继承不必要的资源。
可用于支持通过 Unix 管道传递文件描述符的 Unix 平台。
因此,这个想法是“服务器进程”在安装新进程之前继承默认的信号处理行为,因此它的所有子进程也具有默认处理。
代码的全部荣耀:
from multiprocessing import Process, set_start_method
import sys
from signal import signal, SIGINT
from time import sleep
class NormalWorker(Process):
def run(self):
while True:
print('%d %s work' % (self.pid, type(self).__name__))
sleep(1)
class SpawningWorker(Process):
def handle_signal(self, signum, frame):
print('%d %s handling signal %r' % (
self.pid, type(self).__name__, signum))
def run(self):
signal(SIGINT, self.handle_signal)
sub = NormalWorker()
sub.start()
print('%d joining %d' % (self.pid, sub.pid))
sub.join()
print('%d %s joined sub worker' % (self.pid, type(self).__name__))
def main():
set_start_method('forkserver')
processes = [SpawningWorker() for ii in range(5)]
for pp in processes:
pp.start()
def sig_handler(signum, frame):
print('main handling signal %d' % signum)
for pp in processes:
pp.join()
print('main out')
sys.exit()
signal(SIGINT, sig_handler)
while True:
sleep(1.0)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)