fre*_*rik 14 python multithreading
我有一个应用程序从中复制文件src
到dst
:
import shutil
from threading import Thread
t = Thread(target=shutil.copy, args=[ src, dst ]).start()
Run Code Online (Sandbox Code Playgroud)
我希望应用程序每隔5秒查询一次副本的进度,而不会锁定应用程序本身.这可能吗?
我的目的是将此进度设置为a QtGui.QLabel
以向用户提供有关文件副本的反馈.
使用线程shutil文件副本进行复制时可以实现吗?
Mar*_*ers 18
shutil.copy()
不提供跟踪进度的任何选项,不.最多可以监视目标文件的大小(使用os.*
目标文件名上的函数).
另一种方法是实现自己的复制功能.实施非常简单; shutil.copy()
基本上是一个shutil.copyfile()
加号shutil.copymode()
; shutil.copyfile()
反过来将实际工作委托给shutil.copyfileobj()
(链接到Python源代码).
实现自己的shutil.copyfileobj()
进步包括进步应该是微不足道的; 注入对回调函数的支持,以便在每次复制另一个块时报告通知您的程序:
def copyfileobj(fsrc, fdst, callback, length=16*1024):
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
copied += len(buf)
callback(copied)
Run Code Online (Sandbox Code Playgroud)
并将copied
大小与文件大小进行比较.
我将Martijn Pieters 的回答与这个回答中的一些进度条码结合起来,并修改了这个回答中的 PyCharm 工作,这给了我以下内容。该功能copy_with_progress
是我的目标。
import os
import shutil
def progress_percentage(perc, width=None):
# This will only work for python 3.3+ due to use of
# os.get_terminal_size the print function etc.
FULL_BLOCK = '?'
# this is a gradient of incompleteness
INCOMPLETE_BLOCK_GRAD = ['?', '?', '?']
assert(isinstance(perc, float))
assert(0. <= perc <= 100.)
# if width unset use full terminal
if width is None:
width = os.get_terminal_size().columns
# progress bar is block_widget separator perc_widget : ####### 30%
max_perc_widget = '[100.00%]' # 100% is max
separator = ' '
blocks_widget_width = width - len(separator) - len(max_perc_widget)
assert(blocks_widget_width >= 10) # not very meaningful if not
perc_per_block = 100.0/blocks_widget_width
# epsilon is the sensitivity of rendering a gradient block
epsilon = 1e-6
# number of blocks that should be represented as complete
full_blocks = int((perc + epsilon)/perc_per_block)
# the rest are "incomplete"
empty_blocks = blocks_widget_width - full_blocks
# build blocks widget
blocks_widget = ([FULL_BLOCK] * full_blocks)
blocks_widget.extend([INCOMPLETE_BLOCK_GRAD[0]] * empty_blocks)
# marginal case - remainder due to how granular our blocks are
remainder = perc - full_blocks*perc_per_block
# epsilon needed for rounding errors (check would be != 0.)
# based on reminder modify first empty block shading
# depending on remainder
if remainder > epsilon:
grad_index = int((len(INCOMPLETE_BLOCK_GRAD) * remainder)/perc_per_block)
blocks_widget[full_blocks] = INCOMPLETE_BLOCK_GRAD[grad_index]
# build perc widget
str_perc = '%.2f' % perc
# -1 because the percentage sign is not included
perc_widget = '[%s%%]' % str_perc.ljust(len(max_perc_widget) - 3)
# form progressbar
progress_bar = '%s%s%s' % (''.join(blocks_widget), separator, perc_widget)
# return progressbar as string
return ''.join(progress_bar)
def copy_progress(copied, total):
print('\r' + progress_percentage(100*copied/total, width=30), end='')
def copyfile(src, dst, *, follow_symlinks=True):
"""Copy data from src to dst.
If follow_symlinks is not set and src is a symbolic link, a new
symlink will be created instead of copying the file it points to.
"""
if shutil._samefile(src, dst):
raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if shutil.stat.S_ISFIFO(st.st_mode):
raise shutil.SpecialFileError("`%s` is a named pipe" % fn)
if not follow_symlinks and os.path.islink(src):
os.symlink(os.readlink(src), dst)
else:
size = os.stat(src).st_size
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst, callback=copy_progress, total=size)
return dst
def copyfileobj(fsrc, fdst, callback, total, length=16*1024):
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
copied += len(buf)
callback(copied, total=total)
def copy_with_progress(src, dst, *, follow_symlinks=True):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
shutil.copymode(src, dst)
return dst
Run Code Online (Sandbox Code Playgroud)
这可能有点 hacky,但它有效:
"""
Copying a file and checking its progress while it's copying.
"""
import os
import shutil
import threading
import time
des = r'<PATH/TO/SPURCE/FILE>'
src = r'<PATH/TO/DESTINATION/FILE>'
def checker(source_path, destination_path):
"""
Compare 2 files till they're the same and print the progress.
:type source_path: str
:param source_path: path to the source file
:type destination_path: str
:param destination_path: path to the destination file
"""
# Making sure the destination path exists
while not os.path.exists(destination_path):
print "not exists"
time.sleep(.01)
# Keep checking the file size till it's the same as source file
while os.path.getsize(source_path) != os.path.getsize(destination_path):
print "percentage", int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
time.sleep(.01)
print "percentage", 100
def copying_file(source_path, destination_path):
"""
Copying a file
:type source_path: str
:param source_path: path to the file that needs to be copied
:type destination_path: str
:param destination_path: path to where the file is going to be copied
:rtype: bool
:return: True if the file copied successfully, False otherwise
"""
print "Copying...."
shutil.copyfile(source_path, destination_path)
if os.path.exists(destination_path):
print "Done...."
return True
print "Filed..."
return False
t = threading.Thread(name='copying', target=copying_file, args=(src, des))
# Start the copying on a separate thread
t.start()
# Checking the status of destination file on a separate thread
b = threading.Thread(name='checking', target=checker, args=(src, des))
b.start()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
13440 次 |
最近记录: |