下载过程中如何解压缩和解压 tar.gz 存档?

IWa*_*Zzz 5 gzip tar rust

我需要在下载过程中解压和解压大的 .tar.gz 文件(例如 ~5Gb),而不将存档文件保存在磁盘上。我使用 reqwest crate 来下载文件,使用 flate2 crate 来解压缩,使用 tar crate 来解压。我尝试使用 tar.gz 格式来完成。但有 zip 和 tar.bz2 格式可用。(哪个更容易使用?)我似乎设法实现了这一点,但意外地解压结束时出现错误:

thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Run Code Online (Sandbox Code Playgroud)

我的代码:

let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;

match self.client.get(&full_url).send().await {
  Ok(res) => response = res,
  Err(error) => {
    return Err(Error::new(ErrorKind::InvalidData, error));
  }
};

if response.status() == reqwest::StatusCode::OK {
  let mut stream = response.bytes_stream();

  while let Some(item) = stream.next().await {
    let chunk = item
      .or(Err(format!("Error while downloading file")))
      .unwrap();

    let b: &[u8] = &chunk.to_vec();
    let gz = GzDecoder::new(b);
    let mut archive = Archive::new(gz);

    archive.unpack("./gamedata").expect("Cannot unpack archive");
  }
}
Run Code Online (Sandbox Code Playgroud)

archive.unpack第一次获取块后抛出错误。
我究竟做错了什么?

use*_*342 4

kmdreko 的评论解释了您的代码失败的原因 -.next()仅返回第一个块,并且您必须将所有块提供给 gzip 读取器。另一个答案展示了如何使用阻塞reqwestAPI 来做到这一点。

如果您想继续使用非阻塞 API,那么您可以在单独的线程中启动解码器并通过通道向其提供数据。例如,您可以使用同时支持同步和异步接口的Flume通道。您还需要将通道转换为Read符合预期的内容GzDecoder。例如(编译,但未经测试):

use std::io::{self, Read};

use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;

async fn download() -> io::Result<()> {
    let client = reqwest::Client::new();

    let full_url = "...";
    let response;

    match client.get(full_url).send().await {
        Ok(res) => response = res,
        Err(error) => {
            return Err(io::Error::new(io::ErrorKind::InvalidData, error));
        }
    };

    let (tx, rx) = flume::bounded(0);

    let decoder_thread = std::thread::spawn(move || {
        let input = ChannelRead::new(rx);
        let gz = GzDecoder::new(input);
        let mut archive = Archive::new(gz);
        archive.unpack("./gamedata").unwrap();
    });

    if response.status() == reqwest::StatusCode::OK {
        let mut stream = response.bytes_stream();

        while let Some(item) = stream.next().await {
            let chunk = item
                .or(Err(format!("Error while downloading file")))
                .unwrap();
            tx.send_async(chunk.to_vec()).await.unwrap();
        }
        drop(tx); // close the channel to signal EOF
    }

    tokio::task::spawn_blocking(|| decoder_thread.join())
        .await
        .unwrap()
        .unwrap();

    Ok(())
}

// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
    rx: flume::Receiver<Vec<u8>>,
    current: io::Cursor<Vec<u8>>,
}

impl ChannelRead {
    fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
        ChannelRead {
            rx,
            current: io::Cursor::new(vec![]),
        }
    }
}

impl Read for ChannelRead {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if self.current.position() == self.current.get_ref().len() as u64 {
            // We've exhausted the previous chunk, get a new one.
            if let Ok(vec) = self.rx.recv() {
                self.current = io::Cursor::new(vec);
            }
            // If recv() "fails", it means the sender closed its part of
            // the channel, which means EOF. Propagate EOF by allowing
            // a read from the exhausted cursor.
        }
        self.current.read(buf)
    }
}
Run Code Online (Sandbox Code Playgroud)