Tib*_*ibo 16 python queue pool multiprocessing
我在python中遇到这个问题:
到目前为止,我设法实现这个"手动"这样:
while 1:
self.updateQueue()
while not self.mainUrlQueue.empty():
domain = self.mainUrlQueue.get()
# if we didn't launched any process yet, we need to do so
if len(self.jobs) < maxprocess:
self.startJob(domain)
#time.sleep(1)
else:
# If we already have process started we need to clear the old process in our pool and start new ones
jobdone = 0
# We circle through each of the process, until we find one free ; only then leave the loop
while jobdone == 0:
for p in self.jobs :
#print "entering loop"
# if the process finished
if not p.is_alive() and jobdone == 0:
#print str(p.pid) + " job dead, starting new one"
self.jobs.remove(p)
self.startJob(domain)
jobdone = 1
Run Code Online (Sandbox Code Playgroud)
然而,这会导致大量问题和错误.我想知道我是不是更适合使用一个流程池.这样做的正确方法是什么?
但是,我的队列很多次都是空的,一秒钟就可以填充300个项目,所以我不太清楚如何在这里做事.
Syl*_*oux 39
您可以使用阻止功能queue在启动时生成多个进程(使用multiprocessing.Pool)并让它们休眠,直到要处理的队列上有一些数据可用.如果你不熟悉它,你可以尝试用这个简单的程序"玩":
import multiprocessing
import os
import time
the_queue = multiprocessing.Queue()
def worker_main(queue):
print os.getpid(),"working"
while True:
item = queue.get(True)
print os.getpid(), "got", item
time.sleep(1) # simulate a "long" operation
the_pool = multiprocessing.Pool(3, worker_main,(the_queue,))
# don't forget the coma here ^
for i in range(5):
the_queue.put("hello")
the_queue.put("world")
time.sleep(10)
Run Code Online (Sandbox Code Playgroud)
在Linux上使用Python 2.7.3进行测试
这将产生3个进程(除父进程外).每个孩子都执行这个worker_main功能.这是一个简单的循环,在每次迭代时从队列中获取一个新项.如果没有任何准备好处理,工人将会阻止.
在启动时,所有3个进程都将休眠,直到队列被提供一些数据.当数据可用时,其中一个等待的工作人员获得该项目并开始处理它.之后,它尝试从队列中获取另一个项目,如果没有可用的话再次等待...
Cha*_*zig 11
添加了一些代码(向队列提交“None”)以很好地关闭工作线程,并添加代码来关闭并加入 the_queue 和 the_pool:
import multiprocessing
import os
import time
NUM_PROCESSES = 20
NUM_QUEUE_ITEMS = 20 # so really 40, because hello and world are processed separately
def worker_main(queue):
print(os.getpid(),"working")
while True:
item = queue.get(block=True) #block=True means make a blocking call to wait for items in queue
if item is None:
break
print(os.getpid(), "got", item)
time.sleep(1) # simulate a "long" operation
def main():
the_queue = multiprocessing.Queue()
the_pool = multiprocessing.Pool(NUM_PROCESSES, worker_main,(the_queue,))
for i in range(NUM_QUEUE_ITEMS):
the_queue.put("hello")
the_queue.put("world")
for i in range(NUM_PROCESSES):
the_queue.put(None)
# prevent adding anything more to the queue and wait for queue to empty
the_queue.close()
the_queue.join_thread()
# prevent adding anything more to the process pool and wait for all processes to finish
the_pool.close()
the_pool.join()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)