Paramiko无法下载> 1GB的大文件

the*_*och 14 python paramiko

def download():
if os.path.exists( dst_dir_path ) == False:
    logger.error( "Cannot access destination folder %s. Please check path and permissions. " % ( dst_dir_path ))
    return 1
elif os.path.isdir( dst_dir_path ) == False:
    logger.error( "%s is not a folder. Please check path. " % ( dst_dir_path ))
    return 1

file_list = None
#transport = paramiko.Transport(( hostname, port)) 
paramiko.util.log_to_file('paramiko.log')
ssh = paramiko.SSHClient() 
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
#transport
try:
    ssh.connect( hostname, username=username, password=password, timeout=5.0) 
    #transport.connect(username=username, password=password ) 
except Exception, err:
    logger.error( "Failed to connect to the remote server. Reason: %s" % ( str(err) ) )
    return 1

try:
    #sftp = paramiko.SFTPClient.from_transport(transport)
    sftp = ssh.open_sftp() 
except Exception, err:
    logger.error( "Failed to start SFTP session from connection to %s. Check that SFTP service is running and available. Reason: %s" % ( hostname, str(err) ))
    return 1

try:    
    sftp.chdir(src_dir_path)
    #file_list = sftp.listdir(path="%s" % ( src_dir_path ) )
    file_list = sftp.listdir()

except Exception, err:
    logger.error( "Failed to list files in folder %s. Please check path and permissions. Reason: %s" % ( src_dir_path, str(err) ))
    return 1
match_text = re.compile( file_mask )
download_count = 0
for file in file_list:         
    # Here is an item name... but is it a file or directory?         
    #logger.info( "Downloading file %s." % ( file ) )
    if not re.match( file_mask, file ):
        continue
    else:
        logger.info( "File \"%s\" name matched file mask \"%s\". matches %s.Processing file..." % ( file, file_mask, (match_text.match( file_mask ) ) ) )
    src_file_path = "./%s" % ( file )
    dst_file_path = "/".join( [ dst_dir_path, file]   )
    retry_count = 0
    while True:
        try:
            logger.info( "Downloading file %s to %s."  % ( file, dst_file_path ) )
            #sftp.get( file, dst_file_path, callback=printTotals ) #sftp.get( remote file, local file )
            sftp.get( file, dst_file_path) #sftp.get( remote file, local file )
            logger.info( "Successfully downloaded file %s to %s."  % ( file, dst_file_path ) )
            download_count += 1
            break
        except Exception, err:
            if retry_count == retry_threshold:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                sftp.close() 
                #transport.close()
                return 1
            else:
                logger.error( "Failed to download %s to %s. Reason: %s." % ( file, dst_file_path, str(err) ) )
                retry_count +=1

sftp.close() 
transport.close() 
logger.info( "%d files downloaded." % ( download_count ) )
return 0
Run Code Online (Sandbox Code Playgroud)

当我运行以下功能时,它会下载源文件大约3分钟,然后关闭会话,即使已经下载了只有38-41MB(不同)的1-1.6GB文件.

从Paramiko日志文件中,当SFTP会话关闭时,看起来SSh连接保持打开状态:

DEB [20120913-10:05:00.894] thr = 1 paramiko.transport:切换到新密钥... DEB [20120913-10:05:06.953] thr = 1 paramiko.transport:Rekeying(收到401个数据包,收到1053444个字节) )DEB [20120913-10:05:07.391] thr = 1 paramiko.transport:kex algos:['diffie-hellman-group1-sha1','diffie-hellman-group-exchange-sha1']服务器密钥:['ssh -dss']客户端加密:['aes256-ctr','aes192-ctr','aes128-ctr','aes256-cbc','aes192-cbc','aes128-cbc','twofish-cbc', 'blowfish-cbc','3des-cbc','arcfour']服务器加密:['aes256-ctr','aes192-ctr','aes128-ctr','aes256-cbc','aes192-cbc', 'aes128-cbc','twofish-cbc','blowfish-cbc','3des-cbc','arcfour']客户端mac:['hmac-sha1','hmac-sha1-96','hmac-md5 ','hmac-md5-96','umac-64 @openssh.com']服务器mac:['hmac-sha1','hmac-sha1-96','hmac-md5','hmac-md5-96 ','umac-64 @openssh.com']客户端压缩:['zlib @ openssh.com','zlib','none']服务器压缩:['zlib @ openssh.com','zlib','none ']客户端lang:['']服务器lang:[''] kex跟随?错误DEB [20120913- 10:05:07.421] thr = 1 paramiko.transport:Ciphers同意:local = aes128-ctr,remote = aes128-ctr DEB [20120913-10:05:07.421] thr = 1 paramiko.transport:使用kex diffie-hellman-组1-SHA1; 服务器密钥类型ssh-dss; 密码:本地aes128-ctr,远程aes128-ctr; mac:local hmac-sha1,remote hmac-sha1; compression:local none,remote none DEB [20120913-10:05:07.625] thr = 1 paramiko.transport:切换到新密钥... INF [20120913-10:05:10.374] thr = 2 paramiko.transport.sftp: [chan 1] sftp会议结束.DEB [20120913-10:05:10.388] thr = 2 paramiko.transport:[chan 1] EOF发送(1)

在此之后,脚本退出此异常(来自sftp.get()try/except块)

没有足够的资源来完成请求

系统本身有几千兆字节的磁盘空间,所以这不是问题.

使用FileZilla以及我多年前写的用于进行SFTP传输的Java应用程序,parakmiko失败的同样转移失败.所以我觉得它与paramiko有问题.

这是在Windows XP和Windows Server 2003上运行的.

我已经尝试修补Paramko 1.17,以便更频繁地刷新密钥,但转移仍然会引发异常.Python 2.7.3 Paramiko 1.7,带有补丁Windows 2003 Sevfer

想法?

附加信息:它在Windows XP SP3和Windows 2003服务器上失败,完全相同的行为和错误消息.sys.version信息Window XP工作站:'2.7.3(默认,2012年4月10日,23:31:26)[MSC v.1500 32位(英特尔)]'Windows 2003 Server:'2.7.3(默认,4月10日) 2012,23:31:26)[MSC v.1500 32 bit(Intel)]'我修补了packet.py文件以减少密钥续订之间的时间.它对sftp.get()的行为没有影响.

rpr*_*rpr 13

我在使用 pysftp 通过 SFTP 下载大文件(> 1 GB)时遇到了问题。底层库是Paramiko。谷歌搜索这个问题把我带到这里,有很好的解决方案。尽管如此,许多帖子都相对较旧,我想随着时间的推移,这些问题中的大部分都已得到解决。它对我的问题没有帮助。

即:Paramiko 在 sftp_file.py 中预取期间加载块时遇到内存错误。列表增长超出限制,内存错误不知何故不会阻止执行。它可能在堆栈上以某种方式默默消耗。仅在发生此错误时下载才会失败,并且它们在单独的线程中运行。

无论如何,控制列表大小的方法是设置MAX_REQUEST_SIZE:

paramiko.sftp_file.SFTPFile.MAX_REQUEST_SIZE = pow(2, 22) # 4MB per chunk
Run Code Online (Sandbox Code Playgroud)

如果超过 16MB,你会遇到一个新问题:paramiko.sftp.SFTPError: Garbage packet received。原来在 _read_packet 方法中检查了 sftp.py:

# most sftp servers won't accept packets larger than about 32k, so
# anything with the high byte set (> 16MB) is just garbage.
if byte_ord(x[0]):
    raise SFTPError("Garbage packet received")
Run Code Online (Sandbox Code Playgroud)

因此,如果块大于 16MB,则会引发此错误。我不想摆弄 Paramiko 库本身,所以我必须将我的块大小保持在 4MB 的“可接受的最大值”。

这样我就可以下载大于 30GB 的文件。希望这可以帮助人们。


小智 10

SFTP协议没有办法传输文件数据; 相反,它具有从打开文件中的特定偏移量请求数据块的方法.下载文件的天真方法是请求第一个块,将其写入磁盘,然后请求第二个块,依此类推.这很可靠,但速度很慢.

相反,Paramiko有一个它使用的性能技巧:当你调用.get()它时立即发送一个请求文件中的每个块,并记住它们应该被写入的偏移量.然后,当每个响应到达时,它确保它被写入磁盘上的正确偏移量.有关更多信息,请参阅Paramiko文档中的SFTPFile.prefetch()SFTPFile.readv()方法.我怀疑它在下载1GB文件时存储的簿记信息可能导致资源耗尽,产生"资源不足"的消息.

.get()如果您只是调用.open()获取SFTPFile实例,而不是使用,则调用.read()该对象,或者只是将其传递给Python标准库函数shutil.copyfileobj()以下载内容.这应该避免Paramiko预取缓存,并允许您下载文件,即使它不是那么快.

即:

 def lazy_loading_ftp_file(sftp_host_conn, filename):
    """
        Lazy loading ftp file when exception simple sftp.get call
        :param sftp_host_conn: sftp host
        :param filename: filename to be downloaded
        :return: None, file will be downloaded current directory
    """
    import shutil
    try:
        with sftp_host_conn() as host:
            sftp_file_instance = host.open(filename, 'r')
            with open(filename, 'wb') as out_file:
                shutil.copyfileobj(sftp_file_instance, out_file)
            return {"status": "sucess", "msg": "sucessfully downloaded file: {}".format(filename)}
    except Exception as ex:
        return {"status": "failed", "msg": "Exception in Lazy reading too: {}".format(ex)}
Run Code Online (Sandbox Code Playgroud)


Dr.*_*ass 9

我有一个非常类似的问题,在我的情况下,文件只有大约400 MB但下载约35 MB左右后会一直失败.它并不总是在完全相同的字节数下载时失败,但在大约35 - 40 MB的范围内,文件将停止传输,大约一分钟后,我会得到"没有足够的资源来完成请求"错误.

通过WinSCP或PSFTP下载文件工作正常.

我尝试了Screwtape的方法,它确实有效,但速度很慢.我的400 MB文件开始花费4个小时下载,这对于这个特定的应用来说是一个不可接受的时间框架.

此外,有一次,当我们第一次设置它时,一切正常.但是服务器管理员对SFTP服务器进行了一些更改,而且当事情发生时就会发生变化.我不确定这些更改是什么,但是因为它仍然可以正常使用WinSCP /其他SFTP方法,我认为从服务器端尝试攻击它并不会有成效.

我不会假装理解为什么,但这里最终为我工作的是:

  1. 我下载并安装了当前版本的Paramiko(此时为1.11.1).最初这没有任何区别,但我想我会提到它,以防万一它是解决方案的一部分.

  2. 异常的堆栈跟踪是:

    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 676, in get
        size = self.getfo(remotepath, fl, callback)
    File "C:\Python26\lib\site-packages\paramiko\sftp_client.py", line 645, in getfo
        data = fr.read(32768)
    File "C:\Python26\lib\site-packages\paramiko\file.py", line 153, in read
        new_data = self._read(read_size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 157, in _read
        data = self._read_prefetch(size)
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 138, in _read_prefetch
        self._check_exception()
    File "C:\Python26\lib\site-packages\paramiko\sftp_file.py", line 483, in _check_exception
        raise x
    
    Run Code Online (Sandbox Code Playgroud)
  3. 在sftp_file.py中找了一下,我注意到了这一点(当前版本中的第43-45行):

    # Some sftp servers will choke if you send read/write requests larger than
    # this size.
    MAX_REQUEST_SIZE = 32768
    
    Run Code Online (Sandbox Code Playgroud)
  4. 一时兴起,我尝试将MAX_REQUEST_SIZE更改为1024,瞧,我能够下载整个文件!

  5. 通过将MAX_REQUEST_SIZE更改为1024来实现它之后,我尝试了1024和32768之间的一堆其他值来查看它是否会影响性能或任何事情.但是,当值明显大于1024(1025是好的,但1048最终失败)时,我迟早会得到错误.


小智 5

除了Screwtape的回答之外,还值得一提的是你应该用块来限制块的大小 .read([block size in bytes])

请参阅lazy方法以读取大文件

file.read()在2.4中没有块大小的情况下遇到了实际问题,但2.7可能确定了正确的块大小.


uos*_*ead 5

我将这种类型的脚本与 paramiko 一起用于较大的文件,您可以使用window_size/packet size来查看什么最适合您,如果您希望它具有更高的性能,您可以运行并行进程以使用并行读取不同的文件块第二种方法(参见http://docs.paramiko.org/en/latest/api/sftp.html#paramiko.sftp_file.SFTPFile.readv

import time, paramiko

MAX_RETRIES = 10

ftp_server = "ftp.someserver.com"
port = 22
sftp_file = "/somefolder/somefile.txt"
local_file = "/somefolder/somewhere/here.txt"
ssh_conn = sftp_client = None
username = "username"
password = "password"

start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        # method 1 using sftpfile.get and settings window_size, max_packet_size
        window_size = pow(4, 12)#about ~16MB chunks
        max_packet_size = pow(4, 12)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn, window_size=window_size, max_packet_size=max_packet_size)
        sftp_client.get(sftp_file, local_file)
        # method 2 breaking up file into chunks to read in parallel
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        filesize = sftp_client.stat(sftp_file).st_size
        chunksize = pow(4, 12)#<-- adjust this and benchmark speed
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        with sftp_client.open(sftp_file, "rb") as infile:
            with open(local_file, "wb") as outfile:
                for chunk in infile.readv(chunks):
                    outfile.write(chunk)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
        # back off in steps of 10, 20.. seconds 
    finally:
        if hasattr(sftp_client, "close") and callable(sftp_client.close):
            sftp_client.close()
        if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
            ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Run Code Online (Sandbox Code Playgroud)

如果你真的很关心性能,你可以运行第二种方法并将文件分成多个进程/线程,这里是一个使用多线程的代码示例,它写入多个文件部分,然后将它们连接到一个文件中

import threading, os, time, paramiko

#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10

def make_filepart_path(file_path, part_number):
    """creates filepart path from filepath"""
    return "%s.filepart.%s" % (file_path, part_number+1)

def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
    ssh_conn = sftp_client = None
    for retry in range(max_retries):
        try:
            ssh_conn = paramiko.Transport((ftp_server, port))
            ssh_conn.connect(username=username, password=password)
            sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
            with sftp_client.open(sftp_file, "rb") as infile:
                with open(local_file_part, "wb") as outfile:
                    for chunk in infile.readv(chunks):
                        outfile.write(chunk)
            break
        except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
            retry += 1
            print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
            time.sleep(abs(retry) * 10)
        finally:
            if hasattr(sftp_client, "close") and callable(sftp_client.close):
                sftp_client.close()
            if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
                ssh_conn.close()



start_time = time.time()

for retry in range(MAX_RETRIES):
    try:
        ssh_conn = paramiko.Transport((ftp_server, port))
        ssh_conn.connect(username=username, password=password)
        sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
        # connect to get the file's size in order to calculate chunks
        filesize = sftp_client.stat(sftp_file).st_size
        sftp_client.close()
        ssh_conn.close()
        chunksize = pow(4, 12)
        chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
        thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
        # break the chunks into sub lists to hand off to threads
        thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
        threads = []
        fileparts = []
        for thread_num in range(len(thread_chunks)):
            local_file_part = make_filepart_path(local_file, thread_num) 
            args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
            threads.append(threading.Thread(target=write_chunks, args=args))
            fileparts.append(local_file_part)
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        # join file parts into one file, remove fileparts
        with open(local_file, "wb") as outfile:
            for filepart in fileparts:
                with open(filepart, "rb") as infile:
                    outfile.write(infile.read())
                os.remove(filepart)
        break
    except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
        retry += 1
        print("%s %s - > retrying %s..." % (type(x), x, retry))
        time.sleep(abs(retry) * 10)
    finally:
       if hasattr(sftp_client, "close") and callable(sftp_client.close):
           sftp_client.close()
       if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
           ssh_conn.close()


print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Run Code Online (Sandbox Code Playgroud)