如何使用Python多处理池处理tarfile?

Tim*_*omb 7 python multiprocessing tarfile

我正在尝试使用tar处理tarfile的内容multiprocessing.Pool.我能够在多处理模块中成功使用ThreadPool实现,但是希望能够使用进程而不是线程,因为它可能更快并且消除了对Matplotlib处理多线程环境所做的一些更改.我收到的错误是我怀疑与不共享地址空间的进程有关,但我不确定如何解决它:

Traceback (most recent call last):
  File "test_tarfile.py", line 32, in <module>
    test_multiproc()
  File "test_tarfile.py", line 24, in test_multiproc
    pool.map(read_file, files)
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
ValueError: I/O operation on closed file
Run Code Online (Sandbox Code Playgroud)

实际的程序更复杂,但这是我正在做的再现错误的一个例子:

from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile

def write_tar():
    tar = tarfile.open('test.tar', 'w')
    contents = 'line1'
    info = tarfile.TarInfo('file1.txt')
    info.size = len(contents)
    tar.addfile(info, StringIO.StringIO(contents))
    tar.close()

def test_multithread():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = ThreadPool(processes=1)
    pool.map(read_file, files)
    tar.close()

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = Pool(processes=1)
    pool.map(read_file, files)
    tar.close()

def read_file(f):
    print f.read()

write_tar()
test_multithread()
test_multiproc()
Run Code Online (Sandbox Code Playgroud)

我怀疑当TarInfo对象传递到另一个进程但是父进程TarFile没有时出错了,但是我不确定如何在多进程情况下修复它.我可以这样做而无需从tarball中提取文件并将其写入磁盘吗?

srg*_*erg 6

您没有将TarInfo对象传递到另一个进程,而是将结果传递tar.extractfile(member)给另一个进程,该进程member是一个TarInfo对象.该extractfile(...)方法返回一个类似文件的对象,该对象除其他外还有一个read()对打开的原始tar文件进行操作的方法tar = tarfile.open('test.tar').

但是,您不能在另一个进程中使用来自一个进程的打开文件,您必须重新打开该文件.我test_multiproc()用这个代替了你:

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [name for name in tar.getnames()]
    pool  = Pool(processes=1)
    result = pool.map(read_file2, files)
    tar.close()
Run Code Online (Sandbox Code Playgroud)

并补充说:

def read_file2(name):
    t2 = tarfile.open('test.tar')
    print t2.extractfile(name).read()
    t2.close()
Run Code Online (Sandbox Code Playgroud)

并且能够使您的代码正常工作.