从shutil文件复制线程获取进度

fre*_*rik 14 python multithreading

我有一个应用程序从中复制文件srcdst:

import shutil
from threading import Thread

t = Thread(target=shutil.copy, args=[ src, dst ]).start()
Run Code Online (Sandbox Code Playgroud)

我希望应用程序每隔5秒查询一次副本的进度,而不会锁定应用程序本身.这可能吗?

我的目的是将此进度设置为a QtGui.QLabel以向用户提供有关文件副本的反馈.

使用线程shutil文件副本进行复制时可以实现吗?

Mar*_*ers 18

shutil.copy()不提供跟踪进度的任何选项,不.最多可以监视目标文件的大小(使用os.*目标文件名上的函数).

另一种方法是实现自己的复制功能.实施非常简单; shutil.copy()基本上是一个shutil.copyfile()加号shutil.copymode(); shutil.copyfile()反过来将实际工作委托给shutil.copyfileobj()(链接到Python源代码).

实现自己的shutil.copyfileobj()进步包括进步应该是微不足道的; 注入对回调函数的支持,以便在每次复制另一个块时报告通知您的程序:

def copyfileobj(fsrc, fdst, callback, length=16*1024):
    copied = 0
    while True:
        buf = fsrc.read(length)
        if not buf:
            break
        fdst.write(buf)
        copied += len(buf)
        callback(copied)
Run Code Online (Sandbox Code Playgroud)

并将copied大小与文件大小进行比较.


flu*_*ak7 8

我将Martijn Pieters 的回答这个回答中的一些进度条码结合起来,并修改了这个回答中的 PyCharm 工作,这给了我以下内容。该功能copy_with_progress是我的目标。

import os
import shutil


def progress_percentage(perc, width=None):
    # This will only work for python 3.3+ due to use of
    # os.get_terminal_size the print function etc.

    FULL_BLOCK = '?'
    # this is a gradient of incompleteness
    INCOMPLETE_BLOCK_GRAD = ['?', '?', '?']

    assert(isinstance(perc, float))
    assert(0. <= perc <= 100.)
    # if width unset use full terminal
    if width is None:
        width = os.get_terminal_size().columns
    # progress bar is block_widget separator perc_widget : ####### 30%
    max_perc_widget = '[100.00%]' # 100% is max
    separator = ' '
    blocks_widget_width = width - len(separator) - len(max_perc_widget)
    assert(blocks_widget_width >= 10) # not very meaningful if not
    perc_per_block = 100.0/blocks_widget_width
    # epsilon is the sensitivity of rendering a gradient block
    epsilon = 1e-6
    # number of blocks that should be represented as complete
    full_blocks = int((perc + epsilon)/perc_per_block)
    # the rest are "incomplete"
    empty_blocks = blocks_widget_width - full_blocks

    # build blocks widget
    blocks_widget = ([FULL_BLOCK] * full_blocks)
    blocks_widget.extend([INCOMPLETE_BLOCK_GRAD[0]] * empty_blocks)
    # marginal case - remainder due to how granular our blocks are
    remainder = perc - full_blocks*perc_per_block
    # epsilon needed for rounding errors (check would be != 0.)
    # based on reminder modify first empty block shading
    # depending on remainder
    if remainder > epsilon:
        grad_index = int((len(INCOMPLETE_BLOCK_GRAD) * remainder)/perc_per_block)
        blocks_widget[full_blocks] = INCOMPLETE_BLOCK_GRAD[grad_index]

    # build perc widget
    str_perc = '%.2f' % perc
    # -1 because the percentage sign is not included
    perc_widget = '[%s%%]' % str_perc.ljust(len(max_perc_widget) - 3)

    # form progressbar
    progress_bar = '%s%s%s' % (''.join(blocks_widget), separator, perc_widget)
    # return progressbar as string
    return ''.join(progress_bar)


def copy_progress(copied, total):
    print('\r' + progress_percentage(100*copied/total, width=30), end='')


def copyfile(src, dst, *, follow_symlinks=True):
    """Copy data from src to dst.

    If follow_symlinks is not set and src is a symbolic link, a new
    symlink will be created instead of copying the file it points to.

    """
    if shutil._samefile(src, dst):
        raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))

    for fn in [src, dst]:
        try:
            st = os.stat(fn)
        except OSError:
            # File most likely does not exist
            pass
        else:
            # XXX What about other special files? (sockets, devices...)
            if shutil.stat.S_ISFIFO(st.st_mode):
                raise shutil.SpecialFileError("`%s` is a named pipe" % fn)

    if not follow_symlinks and os.path.islink(src):
        os.symlink(os.readlink(src), dst)
    else:
        size = os.stat(src).st_size
        with open(src, 'rb') as fsrc:
            with open(dst, 'wb') as fdst:
                copyfileobj(fsrc, fdst, callback=copy_progress, total=size)
    return dst


def copyfileobj(fsrc, fdst, callback, total, length=16*1024):
    copied = 0
    while True:
        buf = fsrc.read(length)
        if not buf:
            break
        fdst.write(buf)
        copied += len(buf)
        callback(copied, total=total)


def copy_with_progress(src, dst, *, follow_symlinks=True):
    if os.path.isdir(dst):
        dst = os.path.join(dst, os.path.basename(src))
    copyfile(src, dst, follow_symlinks=follow_symlinks)
    shutil.copymode(src, dst)
    return dst
Run Code Online (Sandbox Code Playgroud)


ali*_*oli 6

这可能有点 hacky,但它有效:

"""
Copying a file and checking its progress while it's copying.
"""

import os
import shutil
import threading
import time

des = r'<PATH/TO/SPURCE/FILE>'
src = r'<PATH/TO/DESTINATION/FILE>'


def checker(source_path, destination_path):
    """
    Compare 2 files till they're the same and print the progress.

    :type source_path: str
    :param source_path: path to the source file
    :type destination_path: str
    :param destination_path: path to the destination file
    """

    # Making sure the destination path exists
    while not os.path.exists(destination_path):
        print "not exists"
        time.sleep(.01)

    # Keep checking the file size till it's the same as source file
    while os.path.getsize(source_path) != os.path.getsize(destination_path):
        print "percentage", int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
        time.sleep(.01)

    print "percentage", 100


def copying_file(source_path, destination_path):
    """
    Copying a file

    :type source_path: str
    :param source_path: path to the file that needs to be copied
    :type destination_path: str
    :param destination_path: path to where the file is going to be copied
    :rtype: bool
    :return: True if the file copied successfully, False otherwise
    """
    print "Copying...."
    shutil.copyfile(source_path, destination_path)

    if os.path.exists(destination_path):
        print "Done...."
        return True

    print "Filed..."
    return False


t = threading.Thread(name='copying', target=copying_file, args=(src, des))
# Start the copying on a separate thread
t.start()
# Checking the status of destination file on a separate thread
b = threading.Thread(name='checking', target=checker, args=(src, des))
b.start()
Run Code Online (Sandbox Code Playgroud)