red*_*moc 4 python queue multithreading python-2.7
我已经使用 python 实现了一些线程应用程序。在运行时我想捕获 CTRL+C sigcall 并退出程序。为此,我注册了一个名为 exit_graceively 的函数,该函数还负责以更受控的方式停止线程。然而,它似乎不起作用。看来处理程序从未被调用过
这是我正在使用的示例:
import Queue
import threading
import signal
import sys
import time
queue = Queue.Queue()
workers = list()
def callback(id, item):
print("{}: {}".format(id, item))
time.sleep(1)
def exit_gracefully(signum, frame):
print("Ctrl+C was pressed. Shutting threads down ...")
print("Stopping workers ...")
for worker in workers:
worker.stop()
sys.exit(1)
class ThreadedTask(threading.Thread):
def __init__(self, id, queue, callbacks):
threading.Thread.__init__(self)
self._stop_event = threading.Event()
self.id = str(id)
self.queue = queue
self.callbacks = callbacks
self._stopped = False
def run(self):
while not self.stopped():
item = self.queue.get()
for callback in self.callbacks:
callback(self.id, item)
self.queue.task_done()
def stop(self):
self._stop_event.set()
self._stopped = True
def stopped(self):
return self._stop_event.is_set() or self._stopped
def main(input_file, thread_count, callbacks):
print("Initializing queue ...")
queue = Queue.Queue()
print("Parsing '{}' ...".format(input_file))
with open(input_file) as f:
for line in f:
queue.put(line.replace("\n", ""))
print("Initializing {} threads ...".format(thread_count))
for id in range(thread_count):
worker = ThreadedTask(id, queue, callbacks)
worker.setDaemon(True)
workers.append(worker)
print("Starting {} threads ...".format(thread_count))
for worker in workers:
worker.start()
queue.join()
if __name__ == '__main__':
signal.signal(signal.SIGINT, exit_gracefully)
print("Starting main ...")
input_file = "list.txt"
thread_count = 10
callbacks = [
callback
]
main(input_file, thread_count, callbacks)
Run Code Online (Sandbox Code Playgroud)
如果您想尝试上面的示例,您可以先生成一些测试数据:
seq 1 10000 > list.txt
Run Code Online (Sandbox Code Playgroud)
任何帮助表示赞赏!
小智 5
这是一个似乎有效的解决方案。
一个问题是,除非设置了a,否则Queue.get()它将被忽略。此处记录: https: //bugs.python.org/issue1360。SIGINTtimeout
另一个问题似乎Queue.join()也被忽视了SIGINT。我通过循环轮询队列以查看它是否为空来解决这个问题。
这些问题似乎已在 Python 3 中得到修复。
我还添加了一个在处理程序中使用的共享事件SIGINT来告诉所有线程关闭。
import Queue
import signal
import sys
import threading
import time
def callback(id, item):
print '{}: {}'.format(id, item)
time.sleep(1)
class ThreadedTask(threading.Thread):
def __init__(self, id, queue, run_event, callbacks):
super(ThreadedTask, self).__init__()
self.id = id
self.queue = queue
self.run_event = run_event
self.callbacks = callbacks
def run(self):
queue = self.queue
while not self.run_event.is_set():
try:
item = queue.get(timeout=0.1)
except Queue.Empty:
pass
else:
for callback in self.callbacks:
callback(self.id, item)
queue.task_done()
def main():
queue = Queue.Queue()
run_event = threading.Event()
workers = []
def stop():
run_event.set()
for worker in workers:
# Allow worker threads to shut down completely
worker.join()
def sigint_handler(signum, frame):
print '\nShutting down...'
stop()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
callbacks = [callback]
for id in range(1, 11):
worker = ThreadedTask(id, queue, run_event, callbacks)
workers.append(worker)
for worker in workers:
worker.start()
with open('list.txt') as fp:
for line in fp:
line = line.strip()
queue.put(line)
while not queue.empty():
time.sleep(0.1)
# Update: Added this to gracefully shut down threads after all
# items are consumed from the queue.
stop()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)