在python中使用多线程队列的方法是否正确?

ram*_*amu 16 python queue python-multithreading

我试图在python中使用Queue,这将是多线程的.我只是想知道我使用的方法是否正确.如果我正在做一些多余的事情或者如果有更好的方法我应该使用.

我试图从表中获取新请求,并使用某些逻辑来安排它们执行某些操作,如运行查询.

所以这里从主线程我为队列生成一个单独的线程.

if __name__=='__main__':

  request_queue = SetQueue(maxsize=-1)
  worker = Thread(target=request_queue.process_queue)
  worker.setDaemon(True)
  worker.start()


  while True:
    try:
      #Connect to the database get all the new requests to be verified
      db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
      #Get new requests for verification
      verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE     JOB_STATUS='%s' ORDER BY JOB_ID" %
                             (username_testschema, 'INITIATED'))

      #If there are some requests to be verified, put them in the queue.
      if len(verify_these) > 0:
        for row in verify_these:
          print "verifying : %s" % row[0]
          verify_id = row[0]
          request_queue.put(verify_id)
    except Exception as e:
      logger.exception(e)
    finally:
      time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

现在在Setqueue类中,我有一个process_queue函数,用于处理添加到队列的每次运行中的前2个请求.

'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''

class SetQueue(Queue.Queue):
  def _init(self, maxsize):
    Queue.Queue._init(self, maxsize)
    self.all_items = set()

  def _put(self, item):
    if item not in self.all_items:
      Queue.Queue._put(self, item)
      self.all_items.add(item)

  '''
  The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
  This way max two requests per run will be processed.
  '''
  def process_queue(self):
    while True:
      scheduler_obj = Scheduler()

      try:
        if self.qsize() > 0:
          for i in range(2):
            job_id = self.get()
            t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
            t.start()

          for i in range(2):
            t.join(timeout=1)
            self.task_done()

      except Exception as e:
        logger.exception(
          "QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
      finally:
        time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

我想知道我的理解是否正确以及是否存在任何问题.

因此,在主func连接到数据库时运行的主线程同时获取新请求并将其放入队列中.队列的工作线程(守护程序)继续从队列中获取新请求,并且执行处理的fork非守护程序线程,并且由于连接的超时为1,工作线程将继续接收新请求而不会被阻止,并且子线程将继续在后台处理.正确?

因此,如果主进程退出这些不会被杀死,直到他们完成他们的工作,但工作守护程序线程将退出.怀疑:如果父进程是守护进程,而子进程是非守护进程,如果父进程退出,则子进程退出?).


我也在这里读到: - David beazley多处理

通过david beazley使用Pool作为线程协处理器部分,他试图解决类似的问题.我应该遵循他的步骤: - 1.创建一个流程池.2.打开一个像我正在为request_queue做的线程3.在那个线程中

  def process_verification_queue(self):
    while True:
      try:
        if self.qsize() > 0:
          job_id = self.get()
          pool.apply_async(Scheduler.verify_func, args=(job_id,))
      except Exception as e:
        logger.exception("QUEUE EXCEPTION : Exception occured while    processing requests in the VERIFICATION QUEUE")
Run Code Online (Sandbox Code Playgroud)

使用池中的进程并行并行运行verify_func.这会给我带来更多表现吗?

Dan*_*Dan 3

虽然可以为队列创建一个新的独立线程,并按照您正在执行的方式单独处理该数据,但我相信每个独立工作线程将消息发布到它们已经“知道”的队列是更常见的。然后,通过从该队列中拉出消息,从其他线程处理该队列。

设计理念

我设想您的应用程序将是三个线程。主线程和两个工作线程。1 个工作线程将从数据库获取请求并将它们放入队列中。另一个工作线程将处理队列中的数据

主线程只需使用线程函数 .join() 等待其他线程完成

您可以使用互斥体来保护线程可以访问的队列并使其线程安全。我也在其他语言的许多其他设计中看到过这种模式。

建议阅读

Brett Slatkin 的《Effective Python》有一个很好的例子来说明这个问题。

他没有继承 Queue,而是在名为 MyQueue 的类中创建了一个包装器,并添加了 get() 和 put(message) 函数。

他甚至在他的 Github 存储库中提供了源代码

https://github.com/bslatkin/ effectivepython/blob/master/example_code/item_39.py

我不隶属于这本书或其作者,但我强烈推荐它,因为我从中学到了很多东西:)