tcp*_*008 81 python multiprocessing python-2.7
我试着阅读http://docs.python.org/dev/library/multiprocessing.html上的文档,但我仍然在努力处理多处理队列,池和锁定.现在我能够构建下面的示例.
关于队列和池,我不确定我是否以正确的方式理解了这个概念,所以如果我错了,请纠正我.我想要实现的是在时间处理2个请求(在这个例子中数据列表有8个)所以,我应该使用什么?池创建2个进程,可以处理两个不同的队列(最多2个)或者我应该只使用Queue每次处理2个输入?锁定将正确打印输出.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
Run Code Online (Sandbox Code Playgroud)
Vel*_*ker 121
解决问题的最佳方法是使用a Pool
.使用Queue
s并具有单独的"队列馈送"功能可能是过度的.
这是你的程序的一个稍微重新安排的版本,这次只有2个进程在一个Pool
.我认为这是最简单的方法,对原始代码的改动很小:
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
Run Code Online (Sandbox Code Playgroud)
请注意,mp_worker()
函数现在接受单个参数(前两个参数的元组),因为该map()
函数将输入数据分块为子列表,每个子列表作为工作函数的单个参数给出.
输出:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Run Code Online (Sandbox Code Playgroud)
根据@Thales评论编辑如下:
如果你想"锁定每个池限制",以便你的进程以串联方式运行,ala:
等待B等待| 完成,B完成| C等待,D等待| C完成,D完成| ...
然后将处理程序函数更改为为每对数据启动池(2个进程):
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
Run Code Online (Sandbox Code Playgroud)
现在您的输出是:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Run Code Online (Sandbox Code Playgroud)
这可能与问题没有100%的关联,但在我搜索使用队列的多处理的示例时,这首先出现在谷歌上.
这是一个基本的示例类,您可以实例化并将项目放入队列中,并且可以等到队列完成.这就是我所需要的一切.
from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
queue = None
def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()
def render(self, item):
self.queue.put(item)
def upload(self):
while True:
item = self.queue.get()
if item is None:
break
# process your item here
self.queue.task_done()
def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
Run Code Online (Sandbox Code Playgroud)
以下是我对此主题的个人转到:
请点击这里,(拉请求欢迎!):https: //gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args
# Work
# ...
# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()
def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)
# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]
try:
# Spawn up to 9999999 jobs, I think this is the maximum possible.
# I do not know what happens if you exceed this.
pool.map_async(func_worker, func_args).get(9999999)
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('\033[0m')
sys.stdout.write('User Interupt\n')
pool.close()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
160191 次 |
最近记录: |