Python队列在获取下一个项目之前等待线程

Cal*_*zyk 2 python queue multithreading

我有一个队列,在添加项目时总是需要准备好处理项目.在队列中的每个项目上运行的函数创建并启动线程以在后台执行操作,以便程序可以执行其他操作.

但是,我调用队列中每个项目的函数只是启动线程然后完成执行,无论它开始完成的线程是什么.因此,在程序处理完最后一项之前,循环将移动到队列中的下一项.

这是代码,以更好地展示我想要做的事情:

queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.
Run Code Online (Sandbox Code Playgroud)

一些说明:

这都在PyGTK应用程序中运行.一旦'SomeThread'操作完成,它就会向GUI发送回调以显示操作结果.

我不知道这对我所遇到的问题有多大影响,但我认为这可能很重要.

Eli*_*sky 7

Python线程的一个基本问题是你不能只是杀死他们 - 他们不得不同意死.

你应该做的是:

  1. 将线程实现为类
  2. 添加方法清除的threading.Event成员,join并且线程的主循环偶尔会检查.如果它看到它被清除,它会返回.对于此覆盖threading.Thread.join,检查事件然后Thread.join自行调用
  3. 要允许(2),Queue使用一些小的超时从块中读取.这样你的线程对kill请求的"响应时间"就是超时,OTOH没有CPU窒息

以下是我所拥有的套接字客户端线程中的一些代码与队列阻塞相同的问题:

class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)
Run Code Online (Sandbox Code Playgroud)

注意self.alive和循环run.