如何在 Python 中使用 Ctrl+C 中止从队列中提取项目的线程

red*_*moc 4 python queue multithreading python-2.7

我已经使用 python 实现了一些线程应用程序。在运行时我想捕获 CTRL+C sigcall 并退出程序。为此,我注册了一个名为 exit_graceively 的函数,该函数还负责以更受控的方式停止线程。然而,它似乎不起作用。看来处理程序从未被调用过

这是我正在使用的示例:

import Queue
import threading
import signal
import sys
import time

queue = Queue.Queue()
workers = list()

def callback(id, item):
    print("{}: {}".format(id, item))
    time.sleep(1)

def exit_gracefully(signum, frame):
    print("Ctrl+C was pressed. Shutting threads down ...")
    print("Stopping workers ...")
    for worker in workers:
        worker.stop()
    sys.exit(1)


class ThreadedTask(threading.Thread):
    def __init__(self, id, queue, callbacks):
        threading.Thread.__init__(self)
        self._stop_event = threading.Event()
        self.id = str(id)
        self.queue = queue
        self.callbacks = callbacks
        self._stopped = False

    def run(self):
        while not self.stopped():
            item = self.queue.get()
            for callback in self.callbacks:
                callback(self.id, item)
            self.queue.task_done()

    def stop(self):
        self._stop_event.set()
        self._stopped = True

    def stopped(self):
        return self._stop_event.is_set() or self._stopped


def main(input_file, thread_count, callbacks):
    print("Initializing queue ...")
    queue = Queue.Queue()

    print("Parsing '{}' ...".format(input_file))
    with open(input_file) as f:
        for line in f:
            queue.put(line.replace("\n", ""))

    print("Initializing {} threads ...".format(thread_count))
    for id in range(thread_count):
        worker = ThreadedTask(id, queue, callbacks)
        worker.setDaemon(True)
        workers.append(worker)

    print("Starting {} threads ...".format(thread_count))
    for worker in workers:
        worker.start()

    queue.join()


if __name__ == '__main__':
    signal.signal(signal.SIGINT, exit_gracefully)
    print("Starting main ...")
    input_file = "list.txt"
    thread_count = 10
    callbacks = [
        callback
    ]
    main(input_file, thread_count, callbacks)
Run Code Online (Sandbox Code Playgroud)

如果您想尝试上面的示例,您可以先生成一些测试数据:

seq 1 10000 > list.txt
Run Code Online (Sandbox Code Playgroud)

任何帮助表示赞赏!

小智 5

这是一个似乎有效的解决方案。

一个问题是,除非设置了a,否则Queue.get()它将被忽略。此处记录: https: //bugs.python.org/issue1360SIGINTtimeout

另一个问题似乎Queue.join()也被忽视了SIGINT。我通过循环轮询队列以查看它是否为空来解决这个问题。

这些问题似乎已在 Python 3 中得到修复。

我还添加了一个在处理程序中使用的共享事件SIGINT来告诉所有线程关闭。

import Queue
import signal
import sys
import threading
import time


def callback(id, item):
    print '{}: {}'.format(id, item)
    time.sleep(1)


class ThreadedTask(threading.Thread):

    def __init__(self, id, queue, run_event, callbacks):
        super(ThreadedTask, self).__init__()
        self.id = id
        self.queue = queue
        self.run_event = run_event
        self.callbacks = callbacks

    def run(self):
        queue = self.queue
        while not self.run_event.is_set():
            try:
                item = queue.get(timeout=0.1)
            except Queue.Empty:
                pass
            else:
                for callback in self.callbacks:
                    callback(self.id, item)
                queue.task_done()


def main():
    queue = Queue.Queue()
    run_event = threading.Event()
    workers = []

    def stop():
        run_event.set()
        for worker in workers:
            # Allow worker threads to shut down completely
            worker.join()

    def sigint_handler(signum, frame):
        print '\nShutting down...'
        stop()
        sys.exit(0)

    signal.signal(signal.SIGINT, sigint_handler)

    callbacks = [callback]

    for id in range(1, 11):
        worker = ThreadedTask(id, queue, run_event, callbacks)
        workers.append(worker)

    for worker in workers:
        worker.start()

    with open('list.txt') as fp:
        for line in fp:
            line = line.strip()
            queue.put(line)

    while not queue.empty():
        time.sleep(0.1)

    # Update: Added this to gracefully shut down threads after all
    # items are consumed from the queue.
    stop()


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)