使用多处理写入多个文件。错误:“TypeError:无法序列化‘_io.TextIOWrapper’对象”

akh*_*ilc 3 python multiprocessing python-3.x

我正在尝试将多处理(4 个内核/进程)的结果写入文件。由于 CPU 内核同时工作,我想制作 4 个文件0.txt1.txt2.txt3.txt并将其保存在multiprocessing.Manager().list(). 但我收到错误,TypeError: cannot serialize '_io.TextIOWrapper' object.

def run_solver(total, proc_id, result, fouts):
    for i in range(10)):
        fouts[proc_id].write('hi\n')

if __name__ == '__main__':
    processes = []
    fouts = Manager().list((open('0.txt', 'w'), open('1.txt', 'w'), open('2.txt', 'w'), open('3.txt', 'w')))
    for proc_id in range(os.cpu_count()):
        processes.append(Process(target=run_solver, args=(int(total/os.cpu_count()), proc_id, result, fouts)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    for i in range(len(fouts)):
        fouts[i].close()
Run Code Online (Sandbox Code Playgroud)

我也尝试在函数内部使用文件句柄填充列表,如下所示。

def run_solver(total, proc_id, result, fouts):
    fout[proc_id] = open(str(proc_id)+'.txt', 'w')
    for i in range(10)):
        fouts[proc_id].write('hi\n')
    fout[proc_id].close()

if __name__ == '__main__':
    processes = []
    fouts = Manager().list([0]*os.cpu_count())
Run Code Online (Sandbox Code Playgroud)

两者都不起作用,我知道有一些与无法序列化或不可腌制有关的东西。但我不知道如何解决这个问题。有人可以提出解决方案吗?

Mar*_*ers 5

打开每个进程中的文件。不要在管理器中打开它们,您不能将打开的文件从管理器进程发送到执行器进程。

def run_solver(total, proc_id, result, fouts):
    with open(fouts[proc_id], 'w') as openfile:
        for i in range(10)):
            openfile.write('hi\n')

if __name__ == '__main__':
    processes = []
    with Manager() as manager:
        fouts = manager.list(['0.txt', '1.txt', '2.txt', '3.txt'])
        for proc_id in range(os.cpu_count()):
            processes.append(Process(
                target=run_solver, args=(
                    int(total/os.cpu_count()), proc_id, result, fouts)
            ))
Run Code Online (Sandbox Code Playgroud)

如果你在进程之间共享文件名,你想在写入这些文件时防止竞争条件,你真的想为每个文件使用一个锁:

def run_solver(total, proc_id, result, fouts, locks):
    with open(fouts[proc_id], 'a') as openfile:
        for i in range(10)):
            with locks[proc_id]:
                openfile.write('hi\n')
                openfile.flush()


if __name__ == '__main__':
    processes = []
    with Manager() as manager:
        fouts = manager.list(['0.txt', '1.txt', '2.txt', '3.txt'])
        locks = manager.list([Lock() for fout in fouts])

        for proc_id in range(os.cpu_count()):
            processes.append(Process(
                target=run_solver, args=(
                    int(total/os.cpu_count()), proc_id, result, fouts, locks
                )
            ))
Run Code Online (Sandbox Code Playgroud)

因为文件with每次打开时都会自动关闭,并且它们以追加模式打开,因此不同的进程不会相互干扰。您确实需要记住在再次解锁之前刷新写入缓冲区。

顺便说一句,您可能想查看进程池而不是自己手动池化。