如何在python中安全地将单个hdf5文件中的数据并行写入多个文件?

Jen*_*nsY 8 python multiprocessing h5py

我正在尝试将我的数据(从hdf5格式的单个文件)写入多个文件,并且在串行执行任务时它可以正常工作.现在我想提高效率并使用multiprocessing模块修改代码,但输出有时会出错.这是我的代码的简化版本.

import multiprocessing as mp
import numpy as np
import math, h5py, time
N = 4  # number of processes to use
block_size = 300
data_sz = 678
dataFile = 'mydata.h5'

# fake some data
mydata = np.zeros((data_sz, 1))
for i in range(data_sz):
    mydata[i, 0] = i+1
h5file = h5py.File(dataFile, 'w')
h5file.create_dataset('train', data=mydata)

# fire multiple workers
pool = mp.Pool(processes=N)
total_part = int(math.ceil(1. * data_sz / block_size))
for i in range(total_part):
    pool.apply_async(data_write_func, args=(dataFile, i, ))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

而且data_write_func()结构是:

def data_write_func(h5file_dir, i, block_size=block_size):
    hf = h5py.File(h5file_dir)
    fout = open('data_part_' + str(i), 'w')
    data_part = hf['train'][block_size*i : min(block_size*(i+1), data_sz)]  # np.ndarray
    for line in data_part:
        # do some processing, that takes a while...
        time.sleep(0.01)
        # then write out..
        fout.write(str(line[0]) + '\n')
    fout.close()
Run Code Online (Sandbox Code Playgroud)

当我设置时N=1,它运作良好.但是当我设置N=2或者N=4,结果有时会混乱(不是每次!).例如在data_part_1中我期望输出为:

301,
302,
303,
...
Run Code Online (Sandbox Code Playgroud)

但有时我得到的是

0,
0,
0,
...
Run Code Online (Sandbox Code Playgroud)

有时我会

379,
380,
381,
...
Run Code Online (Sandbox Code Playgroud)

我是多处理模块的新手,发现它很棘手.如果有任何建议,请欣赏它!

han*_*ast 6

固定后fout.write,并mydata=...为舍甫琴科建议您计划按预期工作,因为每个进程写入自己的文件.这些过程无法相互融合.

probaby想做的事就是使用multiprocessing.map()其中削减你的迭代你(所以你不需要做block_size一样的东西),再加上它保证了结果的顺序进行.我重写了你的代码以使用多处理映射:

import multiprocessing
from functools import partial
import pprint

def data_write_func(line):
  i = multiprocessing.current_process()._identity[0]
  line = [i*2 for i in line]
  files[i-1].write(",".join((str(s) for s in line)) + "\n")

N = 4
mydata=[[x+1,x+2,x+3,x+4] for x in range(0,4000*N,4)] # fake some data
files = [open('data_part_'+str(i), 'w') for i in range(N)]

pool = multiprocessing.Pool(processes=N)
pool.map(data_write_func, mydata)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

请注意:

  • 我是从这个过程中取出来的,它是1或2
  • 就像现在data_write_func为每一行调用一样,文件打开需要在父进程中完成.另外:你不需要close()手动执行该文件,操作系统会在退出python程序时为你执行此操作.

现在,我想最后你想要将所有输出都放在一个文件中,而不是放在单独的文件中.如果你的输出线低于4096个字节的Linux(或低于上OSX 512个字节,对于其它操作系统见这里)你是真正安全的,只需打开一个文件(追加模式),并让每一道工序只写入一个文件,如下所述,这些大小保证是Unix的原子.

更新:

"如果数据作为数据集存储在hdf5文件中怎么办?"

根据hdf5 doc,自2.2.0版以来,它开箱即用:

并行HDF5是HDF5库的一种配置,允许您跨多个并行进程共享打开的文件.它使用MPI(消息传递接口)标准进行进程间通信

因此,如果您在代码中执行此操作:

h5file = h5py.File(dataFile, 'w')
dset = h5file.create_dataset('train', data=mydata)
Run Code Online (Sandbox Code Playgroud)

然后你可以从你的进程中访问dset并读取/写入它而不需要采取任何额外的措施.另请参阅h5py中使用多处理的此示例


And*_*kha 2

该问题无法复制。这是我的完整代码:

#!/usr/bin/env python

import multiprocessing

N = 4
mydata=[[x+1,x+2,x+3,x+4] for x in range(0,4000*N,4)] # fake some data

def data_write_func(mydata, i, block_size=1000):
    fout = open('data_part_'+str(i), 'w')
    data_part = mydata[block_size*i: block_size*i+block_size]
    for line in data_part:
        # do some processing, say *2 for each element...
        line = [x*2 for x in line]
        # then write out..
        fout.write(','.join(map(str,line))+'\n')
    fout.close()

pool = multiprocessing.Pool(processes=N)
for i in range(2):
      pool.apply_async(data_write_func, (mydata, i, ))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

示例输出来自data_part_0

2,4,6,8
10,12,14,16
18,20,22,24
26,28,30,32
34,36,38,40
42,44,46,48
50,52,54,56
58,60,62,64
Run Code Online (Sandbox Code Playgroud)