akh*_*ilc 3 python multiprocessing python-3.x
我正在尝试将多处理(4 个内核/进程)的结果写入文件。由于 CPU 内核同时工作,我想制作 4 个文件0.txt、1.txt、2.txt和3.txt并将其保存在multiprocessing.Manager().list(). 但我收到错误,TypeError: cannot serialize '_io.TextIOWrapper' object.
def run_solver(total, proc_id, result, fouts):
for i in range(10)):
fouts[proc_id].write('hi\n')
if __name__ == '__main__':
processes = []
fouts = Manager().list((open('0.txt', 'w'), open('1.txt', 'w'), open('2.txt', 'w'), open('3.txt', 'w')))
for proc_id in range(os.cpu_count()):
processes.append(Process(target=run_solver, args=(int(total/os.cpu_count()), proc_id, result, fouts)))
for process in processes:
process.start()
for process in processes:
process.join()
for i in range(len(fouts)):
fouts[i].close()
Run Code Online (Sandbox Code Playgroud)
我也尝试在函数内部使用文件句柄填充列表,如下所示。
def run_solver(total, proc_id, result, fouts):
fout[proc_id] = open(str(proc_id)+'.txt', 'w')
for i in range(10)):
fouts[proc_id].write('hi\n')
fout[proc_id].close()
if __name__ == '__main__':
processes = []
fouts = Manager().list([0]*os.cpu_count())
Run Code Online (Sandbox Code Playgroud)
两者都不起作用,我知道有一些与无法序列化或不可腌制有关的东西。但我不知道如何解决这个问题。有人可以提出解决方案吗?
打开每个进程中的文件。不要在管理器中打开它们,您不能将打开的文件从管理器进程发送到执行器进程。
def run_solver(total, proc_id, result, fouts):
with open(fouts[proc_id], 'w') as openfile:
for i in range(10)):
openfile.write('hi\n')
if __name__ == '__main__':
processes = []
with Manager() as manager:
fouts = manager.list(['0.txt', '1.txt', '2.txt', '3.txt'])
for proc_id in range(os.cpu_count()):
processes.append(Process(
target=run_solver, args=(
int(total/os.cpu_count()), proc_id, result, fouts)
))
Run Code Online (Sandbox Code Playgroud)
如果你在进程之间共享文件名,你想在写入这些文件时防止竞争条件,你真的想为每个文件使用一个锁:
def run_solver(total, proc_id, result, fouts, locks):
with open(fouts[proc_id], 'a') as openfile:
for i in range(10)):
with locks[proc_id]:
openfile.write('hi\n')
openfile.flush()
if __name__ == '__main__':
processes = []
with Manager() as manager:
fouts = manager.list(['0.txt', '1.txt', '2.txt', '3.txt'])
locks = manager.list([Lock() for fout in fouts])
for proc_id in range(os.cpu_count()):
processes.append(Process(
target=run_solver, args=(
int(total/os.cpu_count()), proc_id, result, fouts, locks
)
))
Run Code Online (Sandbox Code Playgroud)
因为文件with每次打开时都会自动关闭,并且它们以追加模式打开,因此不同的进程不会相互干扰。您确实需要记住在再次解锁之前刷新写入缓冲区。
顺便说一句,您可能想查看进程池而不是自己手动池化。
| 归档时间: |
|
| 查看次数: |
4954 次 |
| 最近记录: |