kal*_*gne 5 python gzip crc python-multiprocessing
我有一个包含大量文本文件的文件夹。每一个都被压缩并重几个千兆字节。我写了一段代码来拆分每个 gzip 文件的内容:每个 gzip 文件都用 打开gzip
,然后读取每个指定的行块并将其写入一个新的 gzip 文件。
这是文件中的代码file_compression.py
:
import sys, os, file_manipulation as fm
import gzip
def splitGzipFile(fileName, dest=None, chunkPerSplit=100, linePerChunk=4, file_field_separator="_", zfill=3
, verbose=False, file_permission=None, execute=True):
"""
Splits a gz file into chunk files.
:param fileName:
:param chunkPerSplit:
:param linePerChunk:
:return:
"""
absPath = os.path.abspath(fileName)
baseName = os.path.basename(absPath)
dirName = os.path.dirname(absPath)
destFolder = dirName if dest is None else dest
## Compute file fields
rawBaseName, extensions = baseName.split(os.extsep, 1)
if not str(extensions).startswith("."):
extensions = "." + extensions
file_fields = str(rawBaseName).split(file_field_separator)
first_fields = file_fields[:-1] if file_fields.__len__() > 1 else file_fields
first_file_part = file_field_separator.join(first_fields)
last_file_field = file_fields[-1] if file_fields.__len__() > 1 else ""
current_chunk = getCurrentChunkNumber(last_file_field)
if current_chunk is None or current_chunk < 0:
first_file_part = rawBaseName
## Initialize chunk variables
linePerSplit = chunkPerSplit * linePerChunk
# chunkCounter = 0
chunkCounter = 0 if current_chunk is None else current_chunk-1
for chunk in getFileChunks(fileName, linePerSplit):
print "writing " + str(str(chunk).__len__()) + " ..."
chunkCounter += 1
oFile = fm.buildPath(destFolder) + first_file_part + file_field_separator + str(chunkCounter).zfill(zfill) + extensions
if execute:
writeGzipFile(oFile, chunk, file_permission)
if verbose:
print "Splitting: created file ", oFile
def getCurrentChunkNumber(chunk_field):
"""
Tries to guess an integer from a string.
:param chunk_field:
:return: an integer, None if failure.
"""
try:
return int(chunk_field)
except ValueError:
return None
def getFileChunks(fileName, linePerSplit):
with gzip.open(fileName, 'rb') as f:
print "gzip open"
lineCounter = 0
currentChunk = ""
for line in f:
currentChunk += line
lineCounter += 1
if lineCounter >= linePerSplit:
yield currentChunk
currentChunk = ""
lineCounter = 0
if not currentChunk == '':
yield currentChunk
def writeGzipFile(file_name, content, file_permission=None):
import gzip
with gzip.open(file_name, 'wb') as f:
if not content == '':
f.write(content)
if file_permission is not None and type(file_permission) == int:
os.chmod(file_name, file_permission)
Run Code Online (Sandbox Code Playgroud)
此任务是多进程的,在拆分之前为每个文件创建一个进程。每个文件只打开和拆分一次,在被删除之前,我通过将它们记录在列表中来确保这一点:
from tools.file_utils import file_compression as fc, file_manipulation as fm
import multiprocessing
from multiprocessing import Process, Queue, Manager
manager = Manager()
split_seen = manager.list()
files = [...] # list is full of gzip files.
processList = []
sampleDir = "sample/dir/"
for file in files:
fielPath = sampleDir + str(file)
p = Process(target=processFile, args=(filePath, sampleDir, True))
p.start()
processList.append(p)
## Join the processes
for p in processList:
p.join()
def processFile(filePath, destFolder, verbose=True):
global split_seen
if filePath in split_seen:
print "Duplicate file processed: " + str(filePath)
time.sleep(3)
print "adding", filePath, split_seen.__len__()
split_seen.append(filePath)
fc.splitGzipFile(filePath, dest=destFolder, chunkPerSplit=4000000\
, linePerChunk=4
, verbose=True
, file_permission=0770
, zfill=3
)
os.remove(filePath)
Run Code Online (Sandbox Code Playgroud)
到目前为止,代码一直运行良好。但是今天我遇到了 gzip 文件的 CRC 损坏问题:
Process Process-3:72:
Traceback (most recent call last):
...
File "/.../tools/file_utils/file_compression.py", line 43, in splitGzipFile
for chunk in getFileChunks(fileName, linePerSplit):
File "/.../tools/file_utils/file_compression.py", line 70, in getFileChunks
for line in f:
File "/.../python2.7/lib/python2.7/gzip.py", line 450, in readline
c = self.read(readsize)
File "/.../python2.7/lib/python2.7/gzip.py", line 256, in read
self._read(readsize)
File "/.../python2.7/lib/python2.7/gzip.py", line 320, in _read
self._read_eof()
File "/.../python2.7/lib/python2.7/gzip.py", line 342, in _read_eof
hex(self.crc)))
IOError: CRC check failed 0xddbb6045 != 0x34fd5580L
Run Code Online (Sandbox Code Playgroud)
这个问题的根源是什么?我必须再次声明,到目前为止它一直有效,文件夹和文件始终具有相同的结构。这种情况下的不同之处可能是我的脚本处理的 gzip 文件比平时多,可能是平时的两倍。
这可能是同时访问相同文件的问题吗?但我严重怀疑,我通过注册我的 split_seen 列表中访问的每个文件来确保情况并非如此。
我会接受任何提示,因为我没有更多线索可以看哪里。
编辑 1
也许其他人或其他程序访问了一些打开的文件?我不能要求和依赖推荐。因此,首先,如果我放一个multiprocess.Lock
,它会阻止任何其他线程、进程、程序、用户等修改文件吗?还是仅限于Python?我找不到任何关于此的文档。