stu*_*uck 24 python linux multithreading
这应该非常简单,我很惊讶我无法在stackoverflow上找到这个问题.
我有一个类似程序的守护进程需要响应SIGTERM和SIGINT信号才能与upstart一起使用.我读到,最好的方法是在主线程的一个单独的线程中运行程序的主循环,让主线程处理信号.然后,当接收到信号时,信号处理程序应通过设置在主循环中例行检查的sentinel标志来告诉主循环退出.
我已经尝试过这样做,但它没有像我预期的那样工作.请参阅以下代码:
from threading import Thread
import signal
import time
import sys
stop_requested = False
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
global stop_requested
stop_requested = True
def run():
sys.stdout.write("run started\n")
sys.stdout.flush()
while not stop_requested:
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
t = Thread(target=run)
t.start()
t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
Run Code Online (Sandbox Code Playgroud)
我用以下两种方式对此进行了测试:
1)
$ python main.py > output.txt&
[2] 3204
$ kill -15 3204
Run Code Online (Sandbox Code Playgroud)
2)
$ python main.py
ctrl+c
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,我都希望将其写入输出:
run started
handling signal: 15
run exited
join completed
Run Code Online (Sandbox Code Playgroud)
在第一种情况下程序退出但我看到的只有:
run started
Run Code Online (Sandbox Code Playgroud)
在第二种情况下,当按下ctrl + c并且程序没有退出时,SIGTERM信号似乎被忽略.
我在这里错过了什么?
aba*_*ert 30
问题是,正如执行Python信号处理程序中所解释的那样:
Python信号处理程序不会在低级(C)信号处理程序中执行.相反,低级信号处理程序设置一个标志,告诉虚拟机稍后执行相应的Python信号处理程序(例如在下一个字节码指令处)
...
纯粹在C中实现的长时间运行的计算(例如在大量文本上的正则表达式匹配)可以在任意时间内不间断地运行,而不管接收到任何信号.计算完成后将调用Python信号处理程序.
您的主要线程被阻止threading.Thread.join,这最终意味着它在pthread_join呼叫中被阻止在C中.当然,这不是"长时间运行的计算",它是系统调用的一个块......但是,在调用完成之前,您的信号处理程序无法运行.
并且,虽然在某些平台上pthread_join会EINTR因信号而失败,而在其他平台上却不会.在linux上,我认为这取决于你选择BSD风格还是默认siginterrupt行为,但默认是否.
所以你能对它做点啥?
好吧,我很确定Python 3.3中信号处理的变化实际上改变了Linux上的默认行为,因此如果升级你就不需要做任何事情; 只需在3.3+下运行,您的代码就会按照您的预期运行.至少它适用于OS X上的CPython 3.4和Linux上的3.3.(如果我对此错了,我不确定它是否是CPython中的错误,所以你可能想在python-list上提出它而不是打开一个问题......)
另一方面,在3.3之前,该signal模块肯定不会暴露您自己解决此问题所需的工具.所以,如果你不能升级到3.3,那么解决办法就是等待一些可中断的东西,比如一个Condition或一个Event.子线程在退出之前立即通知事件,主线程在加入子线程之前等待事件.这绝对是hacky.而且我找不到能保证会有所作为的东西; 它恰好适用于OS X上的CPython 2.7和3.2以及Linux上的2.6和2.7的各种版本...
stu*_*uck 12
abarnert的回答很明显.不过我还在使用Python 2.7.为了自己解决这个问题,我写了一个InterruptableThread类.
现在它不允许将其他参数传递给线程目标.Join也不接受超时参数.这只是因为我不需要这样做.您可以根据需要添加它.如果您自己使用它,您可能希望删除输出语句.它们就是评论和测试的一种方式.
import threading
import signal
import sys
class InvalidOperationException(Exception):
pass
# noinspection PyClassHasNoInit
class GlobalInterruptableThreadHandler:
threads = []
initialized = False
@staticmethod
def initialize():
signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler)
signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler)
GlobalInterruptableThreadHandler.initialized = True
@staticmethod
def add_thread(thread):
if threading.current_thread().name != 'MainThread':
raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.")
if not GlobalInterruptableThreadHandler.initialized:
GlobalInterruptableThreadHandler.initialize()
GlobalInterruptableThreadHandler.threads.append(thread)
@staticmethod
def sig_handler(signum, frame):
sys.stdout.write("handling signal: %s\n" % signum)
sys.stdout.flush()
for thread in GlobalInterruptableThreadHandler.threads:
thread.stop()
GlobalInterruptableThreadHandler.threads = []
class InterruptableThread:
def __init__(self, target=None):
self.stop_requested = threading.Event()
self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run)
def run(self):
pass
def start(self):
GlobalInterruptableThreadHandler.add_thread(self)
self.t.start()
def stop(self):
self.stop_requested.set()
def is_stop_requested(self):
return self.stop_requested.is_set()
def join(self):
try:
while self.t.is_alive():
self.t.join(timeout=1)
except (KeyboardInterrupt, SystemExit):
self.stop_requested.set()
self.t.join()
sys.stdout.write("join completed\n")
sys.stdout.flush()
Run Code Online (Sandbox Code Playgroud)
该类可以使用两种不同的方式.你可以分类InterruptableThread:
import time
import sys
from interruptable_thread import InterruptableThread
class Foo(InterruptableThread):
def __init__(self):
InterruptableThread.__init__(self)
def run(self):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not self.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
sys.stdout.write("all exited\n")
sys.stdout.flush()
foo = Foo()
foo2 = Foo()
foo.start()
foo2.start()
foo.join()
foo2.join()
Run Code Online (Sandbox Code Playgroud)
或者你可以使用它更像threading.thread的工作方式.run方法必须将InterruptableThread对象作为参数.
import time
import sys
from interruptable_thread import InterruptableThread
def run(t):
sys.stdout.write("run started\n")
sys.stdout.flush()
while not t.is_stop_requested():
time.sleep(2)
sys.stdout.write("run exited\n")
sys.stdout.flush()
t1 = InterruptableThread(run)
t2 = InterruptableThread(run)
t1.start()
t2.start()
t1.join()
t2.join()
sys.stdout.write("all exited\n")
sys.stdout.flush()
Run Code Online (Sandbox Code Playgroud)
用它做你想做的事.
| 归档时间: |
|
| 查看次数: |
20224 次 |
| 最近记录: |