xiv*_*axy 7 python file-io python-2.7
我有一个使用file.read(len)方法处理来自文件的二进制数据的函数。但是,我的文件很大,被切成许多小文件,每个文件 50 MB。是否有一些包装类可以将许多文件馈送到缓冲流中,并提供 read() 方法?
类fileinput.FileInput可以做这样的事情,但它只支持逐行读取(readline()没有参数的方法)并且没有read(len)指定要读取的字节数。
将可迭代对象与 连接起来非常容易itertools.chain:
from itertools import chain
def read_by_chunks(file_objects, block_size=1024):
readers = (iter(lambda f=f: f.read(block_size), '') for f in file_objects)
return chain.from_iterable(readers)
Run Code Online (Sandbox Code Playgroud)
然后你可以这样做:
for chunk in read_by_chunks([f1, f2, f3, f4], 4096):
handle(chunk)
Run Code Online (Sandbox Code Playgroud)
在按4096字节块读取文件的同时按顺序处理文件。
如果您需要为一个对象提供一个read方法,因为其他一些函数希望您可以编写一个非常简单的包装器:
class ConcatFiles(object):
def __init__(self, files, block_size):
self._reader = read_by_chunks(files, block_size)
def __iter__(self):
return self._reader
def read(self):
return next(self._reader, '')
Run Code Online (Sandbox Code Playgroud)
然而,这仅使用固定的块大小。可以通过执行block_size以下操作来支持 的参数read:
def read(self, block_size=None):
block_size = block_size or self._block_size
total_read = 0
chunks = []
for chunk in self._reader:
chunks.append(chunk)
total_read += len(chunk)
if total_read > block_size:
contents = ''.join(chunks)
self._reader = chain([contents[block_size:]], self._reader)
return contents[:block_size]
return ''.join(chunks)
Run Code Online (Sandbox Code Playgroud)
注意:如果您以二进制模式阅读,则应''将代码中的空字符串替换为空字节b''。
而不是将流列表转换为生成器 - 正如其他一些答案所做的那样 - 您可以将流链接在一起,然后使用文件接口:
def chain_streams(streams, buffer_size=io.DEFAULT_BUFFER_SIZE):
"""
Chain an iterable of streams together into a single buffered stream.
Usage:
def generate_open_file_streams():
for file in filenames:
yield open(file, 'rb')
f = chain_streams(generate_open_file_streams())
f.read()
"""
class ChainStream(io.RawIOBase):
def __init__(self):
self.leftover = b''
self.stream_iter = iter(streams)
try:
self.stream = next(self.stream_iter)
except StopIteration:
self.stream = None
def readable(self):
return True
def _read_next_chunk(self, max_length):
# Return 0 or more bytes from the current stream, first returning all
# leftover bytes. If the stream is closed returns b''
if self.leftover:
return self.leftover
elif self.stream is not None:
return self.stream.read(max_length)
else:
return b''
def readinto(self, b):
buffer_length = len(b)
chunk = self._read_next_chunk(buffer_length)
while len(chunk) == 0:
# move to next stream
if self.stream is not None:
self.stream.close()
try:
self.stream = next(self.stream_iter)
chunk = self._read_next_chunk(buffer_length)
except StopIteration:
# No more streams to chain together
self.stream = None
return 0 # indicate EOF
output, self.leftover = chunk[:buffer_length], chunk[buffer_length:]
b[:len(output)] = output
return len(output)
return io.BufferedReader(ChainStream(), buffer_size=buffer_size)
Run Code Online (Sandbox Code Playgroud)
然后将其用作任何其他文件/流:
f = chain_streams(open_files_or_chunks)
f.read(len)
Run Code Online (Sandbox Code Playgroud)
我不熟悉标准库中执行该功能的任何内容,因此,如果没有:
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
class ConcatenatedFiles( object ):
def __init__(self, file_objects):
self.fds= list(reversed(file_objects))
def read( self, size=None ):
remaining= size
data= StringIO()
while self.fds and (remaining>0 or remaining is None):
data_read= self.fds[-1].read(remaining or -1)
if len(data_read)<remaining or remaining is None: #exhausted file
self.fds.pop()
if not remaining is None:
remaining-=len(data_read)
data.write(data_read)
return data.getvalue()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4886 次 |
| 最近记录: |