使用Flask生成并流式传输压缩文件

mat*_*ent 12 python zip flask

我能够动态生成和流式传输文本,但无法动态生成和流式传输压缩文件.

from flask import Flask, request, Response,stream_with_context
import zlib
import gzip

app = Flask(__name__)

def generate_text():
    for x in xrange(10000):
        yield "this is my line: {}\n".format(x)

@app.route('/stream_text')
def stream_text():
    response = Response(stream_with_context(generate_text()))
    return response

def generate_zip():
    for x in xrange(10000):
        yield zlib.compress("this is my line: {}\n".format(x))

@app.route('/stream_zip')
def stream_zip():
    response = Response(stream_with_context(generate_zip()), mimetype='application/zip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)
Run Code Online (Sandbox Code Playgroud)

比使用curl和gunzip:

curl http://127.0.0.1:8000/stream_zip > data.gz

gunzip data.gz
gunzip: data.gz: not in gzip format
Run Code Online (Sandbox Code Playgroud)

我不在乎它是zip,gzip还是任何其他类型的压缩.

generate_text 在我的实际代码中生成超过4 GB的数据,所以我想动态压缩.

将文本保存到文件,压缩,返回zip文件,然后删除不是我想要的解决方案.

我需要在循环中生成一些文本 - >压缩文本 - >流式传输压缩数据,直到我完成.

zip/gzip ...任何东西都可以,只要它有效.

Mar*_*ers 14

您正在生成一系列压缩文档,而不是单个压缩流.不要使用zlib.compress(),它包括标题并形成单个文档.

您需要创建一个zlib.compressobj()对象,并使用该对象上的Compress.compress()方法生成数据流(然后是最终调用Compress.flush()):

def generate_zip():
    compressor = zlib.compressobj()
    for x in xrange(10000):
        chunk = compressor.compress("this is my line: {}\n".format(x))
        if chunk:
            yield chunk
    yield compressor.flush()
Run Code Online (Sandbox Code Playgroud)

当没有足够的数据产生完整的压缩数据块时,压缩器可以产生空块,只有在实际发送任何内容时,上述情况才会产生.由于您的输入数据具有如此高的重复性,因此数据可以被有效压缩,因此只产生3次(一次使用2字节标头,一次使用约21kb的压缩数据覆盖前8288次迭代xrange(),最后使用剩余的4kb对于其余的循环).

总的来说,这会产生与单个zlib.compress()调用相同的数据,并且所有输入都连接在一起.正确的MIME类型此数据格式application/zlib, application/zip.

gzip然而,这种格式不容易解压缩,并非没有一些技巧.那是因为上面还没有生成GZIP文件,它只生成一个原始的zlib压缩流.要使GZIP兼容,您需要正确配置压缩,首先发送标头,并在结尾添加CRC校验和和数据长度值:

import zlib
import struct
import time

def generate_gzip():
    # Yield a gzip file header first.
    yield (
        '\037\213\010\000' + # Gzip file, deflate, no filename
        struct.pack('<L', long(time.time())) +  # compression start time
        '\002\377'  # maximum compression, no OS specified
    )

    # bookkeeping: the compression state, running CRC and total length
    compressor = zlib.compressobj(
        9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    crc = zlib.crc32("")
    length = 0

    for x in xrange(10000):
        data = "this is my line: {}\n".format(x)
        chunk = compressor.compress(data)
        if chunk:
            yield chunk
        crc = zlib.crc32(data, crc) & 0xffffffffL
        length += len(data)

    # Finishing off, send remainder of the compressed data, and CRC and length
    yield compressor.flush()
    yield struct.pack("<2L", crc, length & 0xffffffffL)
Run Code Online (Sandbox Code Playgroud)

服务于application/gzip:

@app.route('/stream_gzip')
def stream_gzip():
    response = Response(stream_with_context(generate_gzip()), mimetype='application/gzip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response
Run Code Online (Sandbox Code Playgroud)

并且结果可以在运行中解压缩:

curl http://127.0.0.1:8000/stream_gzip | gunzip -c | less
Run Code Online (Sandbox Code Playgroud)


Dro*_*man -2

我认为目前您只是发送生成器而不是数据!您可能想做这样的事情(我还没有测试过,所以可能需要一些改变):

def generate_zip():
    import io
    with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as gfile:
        for x in xrange(10000):
             gfile.write("this is my line: {}\n".format(x))
    return gfile.read()
Run Code Online (Sandbox Code Playgroud)