当我放入或从中取出时,我是否应该打扰锁定队列?

x.w*_*ang 7 python multithreading

我已经在python3中学习了关于多线程和队列的教程.正如官方教程所说,"此模块中的Queue类实现了所有必需的锁定语义".但是在另一个教程中,我看到了一个如下例子:

import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q
   def run(self):
      print ("Starting " + self.name)
      process_data(self.name, self.q)
      print ("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print ("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
         time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
   pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
   t.join()
print ("Exiting Main Thread")
Run Code Online (Sandbox Code Playgroud)

Mat*_*Rav 13

我相信你所关注的教程是如何使用Python的线程安全队列的一个不好的例子.特别是,本教程使用线程安全队列的方式不幸需要额外的锁定.实际上,这个额外的锁意味着教程中的线程安全队列可以用基于简单列表的老式非线程安全队列替换.

需要锁定的原因由以下文档暗示Queue.empty():

如果empty()返回False,则不保证对get()的后续调用不会阻塞.

问题是另一个线程可能在对empty()的调用和对get()的调用之间运行,窃取了否则报告存在的empty()项.本教程可能使用锁来确保线程从调用empty()到调用get()之前具有对队列的独占访问权.如果没有这个锁,两个线程可以进入if语句,并且都发出一个get()调用,这意味着其中一个可以阻塞,等待一个永远不会被推送的项.


让我向您展示如何正确使用线程安全队列.而不是先检查empty(),而是直接依赖get()的阻塞行为:

def process_data(threadName, q):
    while True:
        data = q.get()
        if exitFlag:
            break
        print("%s processing %s" % (threadName, data))
Run Code Online (Sandbox Code Playgroud)

队列的内部锁定将确保两个线程在调用get()期间不会干扰,并且不需要queueLock.请注意,教程的原始代码将每隔1秒定期检查exitFlag,而此修改后的队列要求您在将exitFlag设置为之后将虚拟对象推入队列True- 否则,将永远不会检查该标志.

控制器代码的最后一部分需要修改如下:

# Notify threads it's time to exit
exitFlag = 1
for _ in range(len(threadList)):
    # Push a dummy element causing a single thread to wake-up and stop.
    workQueue.put(None)
# Wait for all threads to exit
for t in threads:
    t.join()
Run Code Online (Sandbox Code Playgroud)

本教程使用threadsafe队列还有另一个问题,即在等待队列清空时主线程中使用busy-loop:

# Wait for queue to empty
while not workQueue.empty():
    pass
Run Code Online (Sandbox Code Playgroud)

要等待队列清空,最好在线程中使用Queue.task_done(),然后在主线程中调用Queue.join().在process_data()中循环体的末尾,调用q.task_done().在主控制器代码中,而不是上面的while循环,调用q.join().

另请参阅队列模块上Python文档页面底部的示例.


或者,您可以保留queueLock并使用普通旧列表替换threadsafe队列,如下所示:

  • 替换workQueue = queue.Queue(10)workQueue = []
  • 替换if not workQueue.empty()if len(workQueue) > 0
  • 替换workQueue.get()workQueue.pop(0)
  • 替换workQueue.put(word)workQueue.append(word)

请注意,这不会保留原始版本中存在的put()的阻止行为.