Shr*_*ut1 1 compression gzip python-multiprocessing
我正在尝试使用 python 将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带有锁的多处理来从多个进程并行写入新的 gzip,但我在输出 gz 文件上收到无效格式错误。
我认为这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据,以便将正确的条目写入存档中,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都会保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。
如果我在不使用 gzip 的情况下打开脚本中的目标文件,那么这一切都有效。我还可以写入多个 gzip 并将它们合并,但如果可能的话更愿意避免这种情况。
这是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
Run Code Online (Sandbox Code Playgroud)
关于代码的注释:
我想我需要一个可以以某种方式协调不同进程之间的压缩写入的库。显然,这指的是使用单个进程来执行写入(如协调器进程),但这可能会引入瓶颈。
堆栈上有一些相关的帖子,但似乎没有一个专门解决我想要做的事情。我还看到像“ mgzip ”、“ pigz ”和“ migz ”这样的实用程序可以并行压缩,但我不认为它们适用于这个用例。mgzip 在我的测试中不起作用(0 大小的文件),pigz 似乎消耗整个文件作为命令行上的输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python 中。
如果无法完成,那就这样吧,但任何答案将不胜感激!
-------- 更新和工作代码:
在 Mark Adler 的帮助下,我能够创建一个多处理脚本,该脚本可以并行压缩数据,并有一个编写器进程将其添加到目标 gz 文件中。凭借现代 NVME 驱动器上的吞吐量,这降低了在成为 I/O 限制之前因压缩而成为 CPU 限制的可能性。
为了使该代码正常工作,需要进行的最大更改如下:
gzip.compress(bytes(string, 'utf-8'),compresslevel=9)需要压缩单个“块”或“流”:file = open(outfile, 'wb')需要打开可以成为目标 gzip 的未编码的二进制输出文件。file.write()操作必须从单个进程中发生,因为它必须串行执行。值得注意的是,这并不是并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。
更新的代码(经过测试并按原样工作):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
Run Code Online (Sandbox Code Playgroud)