Python:读取12位二进制文​​件

squ*_*lle 3 python binary unpack python-3.x

我试图使用Python 3读取包含图像(视频)的12位二进制文​​件.

要读取类似的文件但以16位编码,以下内容非常有效:

import numpy as np
images = np.memmap(filename_video, dtype=np.uint16, mode='r', shape=(nb_frames, height, width))
Run Code Online (Sandbox Code Playgroud)

其中filename_video是可以从另一个文件读取的视频的文件和nb_frames,高度和宽度特征.通过"工作得很好"我的意思是快速:在我的计算机上读取140帧的640x256视频大约需要1毫秒.

据我所知,当文件以12位编码时我不能使用它,因为没有uint12类型.所以我要做的是读取一个12位文件并将其存储在一个16位的uint数组中.以下内容取自(Python:读取12位打包二进制图像),有效:

with open(filename_video, 'rb') as f:
    data=f.read()
images=np.zeros(int(2*len(data)/3),dtype=np.uint16)
ii=0
for jj in range(0,int(len(data))-2,3):
    a=bitstring.Bits(bytes=data[jj:jj+3],length=24)
    images[ii],images[ii+1] = a.unpack('uint:12,uint:12')
    ii=ii+2
images = np.reshape(images,(nb_frames,height,width))
Run Code Online (Sandbox Code Playgroud)

然而,这非常慢:阅读640x256视频只有5帧,我的机器需要大约11.5秒.理想情况下,我希望能够像使用memmap读取8位或16位文件一样有效地读取12位文件.或者至少慢10 ^ 5倍.我怎么能加快速度呢?

这是一个文件示例: http ://s000.tinyupload.com/index.php?file_id = 26973488795334213426(nb_frames = 5,height = 256,width = 640).

max*_*111 8

加速 numpy 向量化方法的一种方法是避免为临时数据分配昂贵的内存,更有效地使用缓存并利用并行化。这可以很容易地使用Numba,Cython或来完成C。请注意,并行化并不总是有益的。如果要转换的数组太小,请使用单线程版本 ( parallel=False)

Cyril Gaudefroy 的 Numba 版本回答带有临时内存分配

import numba as nb
import numpy as np
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)
def nb_read_uint12(data_chunk):
  """data_chunk is a contigous 1D array of uint8 data)
  eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
  
  #ensure that the data_chunk has the right length
  assert np.mod(data_chunk.shape[0],3)==0
  
  out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
  
  for i in nb.prange(data_chunk.shape[0]//3):
    fst_uint8=np.uint16(data_chunk[i*3])
    mid_uint8=np.uint16(data_chunk[i*3+1])
    lst_uint8=np.uint16(data_chunk[i*3+2])
    
    out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
    out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
    
  return out
Run Code Online (Sandbox Code Playgroud)

Cyril Gaudefroy 的 Numba 版本通过内存预分配回答

如果您在类似大小的数据块上多次应用此函数,则只能预分配一次输出数组。

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def nb_read_uint12_prealloc(data_chunk,out):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0
    assert out.shape[0]==data_chunk.shape[0]//3*2

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8

    return out
Run Code Online (Sandbox Code Playgroud)

带有临时内存分配的 DGrifffith 答案的 Numba 版本

@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2(data_chunk):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0

    out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)

    return out
Run Code Online (Sandbox Code Playgroud)

带有内存预分配的 DGrifffith 答案的 Numba 版本

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2_prealloc(data_chunk,out):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0
    assert out.shape[0]==data_chunk.shape[0]//3*2

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)

    return out
Run Code Online (Sandbox Code Playgroud)

时间安排

num_Frames=10
data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)

%timeit read_uint12_gaud(data_chunk)
#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#435 MB/s

%timeit nb_read_uint12(data_chunk)
#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5235 MB/s

out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
%timeit nb_read_uint12_prealloc(data_chunk,out)
#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#11759 MB/s

%timeit read_uint12_griff(data_chunk)
#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#491 MB/s

%timeit read_uint12_var_2(data_chunk)
#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5297 MB/s
%timeit read_uint12_var_2_prealloc(data_chunk,out)
#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#12227 MB/s
Run Code Online (Sandbox Code Playgroud)


小智 7

我与@ max9111提出的不需要调用的实现略有不同unpackbits.

它通过将中间字节切成两半并使用numpy的二进制运算直接uint12从三个连续创建两个值uint8.在下文中,data_chunks假设是包含任意数量的12位整数的信息的二进制字符串(因此其长度必须是3的倍数).

def read_uint12(data_chunk):
    data = np.frombuffer(data_chunk, dtype=np.uint8)
    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
    snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8
    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])
Run Code Online (Sandbox Code Playgroud)

我对其他实现进行了基准测试,这种方法在~5 Mb输入时速度提高了约4倍:
read_uint12_unpackbits每个环路65.5 ms±1.11 ms(7次运行的平均值±标准偏差,每次10次循环) read_uint1214 ms±513μs/循环(平均值±标准偏差,7次运行,每次100次循环)