Threadsafe和容错文件写入

geo*_*org 9 python file thread-safety

我有一个长时间运行的进程,它在文件中写了很多东西.结果应该是一切或什么都没有,所以我写一个临时文件并在最后将其重命名为真实姓名.目前,我的代码是这样的:

filename = 'whatever'
tmpname = 'whatever' + str(time.time())

with open(tmpname, 'wb') as fp:
    fp.write(stuff)
    fp.write(more stuff)

if os.path.exists(filename):
    os.unlink(filename)
os.rename(tmpname, filename)
Run Code Online (Sandbox Code Playgroud)

由于以下几个原因,我对此并不满意:

  • 如果发生异常,它不会正确清理
  • 它忽略了并发问题
  • 它不可重复使用(我需要在我的程序中的不同位置)

有什么建议如何改进我的代码?有没有可以帮助我的图书馆?

Aus*_*ips 11

您可以使用Python的tempfile模块为您提供临时文件名.它可以以线程安全的方式创建临时文件,而不是使用一个临时文件,time.time()如果同时在多个线程中使用,则可以返回相同的名称.

正如您对问题的评论中所建议的那样,这可以与上下文管理器的使用相结合.您可以通过查看Python tempfile.py源来获得有关如何实现您想要执行的操作的一些想法.

以下代码段可能会执行您想要的操作.它使用从中返回的对象的一些内部tempfile.

  • 创建临时文件是线程安全的.
  • 成功完成后重命名文件是原子的,至少在Linux上是这样.之间没有单独的检查os.path.exists(),os.rename()这可能会引入竞争条件.对于Linux上的原子重命名,源和目标必须位于同一文件系统上,这就是为什么此代码将临时文件放在与目标文件相同的目录中的原因.
  • RenamedTemporaryFile类应该表现得像个NamedTemporaryFile对于大多数用途时,它使用上下文管理封闭的,除了,该文件被重命名.

样品:

import tempfile
import os

class RenamedTemporaryFile(object):
    """
    A temporary file object which will be renamed to the specified
    path on exit.
    """
    def __init__(self, final_path, **kwargs):
        tmpfile_dir = kwargs.pop('dir', None)

        # Put temporary file in the same directory as the location for the
        # final file so that an atomic move into place can occur.

        if tmpfile_dir is None:
            tmpfile_dir = os.path.dirname(final_path)

        self.tmpfile = tempfile.NamedTemporaryFile(dir=tmpfile_dir, **kwargs)
        self.final_path = final_path

    def __getattr__(self, attr):
        """
        Delegate attribute access to the underlying temporary file object.
        """
        return getattr(self.tmpfile, attr)

    def __enter__(self):
        self.tmpfile.__enter__()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.tmpfile.delete = False
            result = self.tmpfile.__exit__(exc_type, exc_val, exc_tb)
            os.rename(self.tmpfile.name, self.final_path)
        else:
            result = self.tmpfile.__exit__(exc_type, exc_val, exc_tb)

        return result
Run Code Online (Sandbox Code Playgroud)

然后你可以像这样使用它:

with RenamedTemporaryFile('whatever') as f:
    f.write('stuff')
Run Code Online (Sandbox Code Playgroud)

在写入期间,内容转到临时文件,退出时文件被重命名.这段代码可能需要一些调整,但总体思路应该可以帮助你入门.


jfs*_*jfs 5

要可靠地将全部或全部写入文件:

import os
from contextlib import contextmanager
from tempfile   import NamedTemporaryFile

if not hasattr(os, 'replace'):
    os.replace = os.rename #NOTE: it won't work for existing files on Windows

@contextmanager
def FaultTolerantFile(name):
    dirpath, filename = os.path.split(name)
    # use the same dir for os.rename() to work
    with NamedTemporaryFile(dir=dirpath, prefix=filename, suffix='.tmp') as f:
        yield f
        f.flush()   # libc -> OS
        os.fsync(f) # OS -> disc (note: on OSX it is not enough)
        f.delete = False # don't delete tmp file if `replace()` fails
        f.close()
        os.replace(f.name, name)
Run Code Online (Sandbox Code Playgroud)

另请参阅重命名()没有fsync()安全吗?(@Mihai Stan提到)

用法

with FaultTolerantFile('very_important_file') as file:
    file.write('either all ')
    file.write('or nothing is written')
Run Code Online (Sandbox Code Playgroud)

要实现缺失,os.replace()您可以MoveFileExW(src, dst, MOVEFILE_REPLACE_EXISTING)在Windows上调用(通过win32file或ctypes模块).

如果有多个线程,您可以queue.put(data)从不同的线程调用并在专用线程中写入文件:

 for data in iter(queue.get, None):
     file.write(data)
Run Code Online (Sandbox Code Playgroud)

queue.put(None) 打破了循环.

作为替代方案,您可以使用锁(线程,多处理,文件锁)来同步访问:

def write(self, data):
    with self.lock:
        self.file.write(data)
Run Code Online (Sandbox Code Playgroud)