Raf*_*ini 12 python queue file-io multiprocessing
我在python中遇到以下问题.
我需要并行进行一些计算,我需要在文件中顺序编写结果.所以我创建了一个接收multiprocessing.Queue文件句柄的函数,进行计算并在文件中打印结果:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
Run Code Online (Sandbox Code Playgroud)
但是脚本运行后文件结束为空.我试图将worker()函数更改为:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
Run Code Online (Sandbox Code Playgroud)
并将文件名作为参数传递.然后它按我的意图工作.当我尝试按顺序执行相同的操作时,如果没有多处理,它也可以正常工作.
为什么它在第一个版本中没有用?我看不出问题.
另外:我可以保证两个进程不会同时尝试写入文件吗?
编辑:
谢谢.我现在明白了.这是工作版本:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join ()
for p in calcProc:
p.join()
writProc.join ()
Run Code Online (Sandbox Code Playgroud)
S.L*_*ott 17
你真的应该使用两个队列和三种不同的处理方式.
把东西放入队列#1.
从队列#1中获取东西并进行计算,将东西放入队列#2.您可以拥有其中的许多,因为它们从一个队列中获得并安全地放入另一个队列.
从队列#2中获取内容并将其写入文件.你必须拥有其中的一个,而不是更多.它"拥有"该文件,保证原子访问,并绝对保证文件干净,一致地编写.
如果有人正在寻找一种简单的方法来做到这一点,这可以为您提供帮助。我认为以这种方式进行操作没有任何不利之处。如果有,请告诉我。
import multiprocessing
import re
def mp_worker(item):
# Do something
return item, count
def mp_handler():
cpus = multiprocessing.cpu_count()
p = multiprocessing.Pool(cpus)
# The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
with open('ExampleFile.txt') as f:
listX = [line for line in (l.strip() for l in f) if line]
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, listX):
# (item, count) tuples from worker
f.write('%s: %d\n' % result)
if __name__=='__main__':
mp_handler()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
16736 次 |
| 最近记录: |