'How decompress and unpack tar.gz archive in download process?
I need decompress and unpack big .tar.gz files (e.g. ~5Gb) in download process without save an archive file on disk. I use reqwest crate for downloading files, flate2 crate for decompressing and tar crate for unpacking. I try to do it with tar.gz format. But there are the zip and tar.bz2 formats available. (Which one is easier to work with?) It seems that I managed to implement this, but unexpectedly unpacking ended with an error:
thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
My code:
let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;
match self.client.get(&full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(Error::new(ErrorKind::InvalidData, error));
}
};
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
let b: &[u8] = &chunk.to_vec();
let gz = GzDecoder::new(b);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").expect("Cannot unpack archive");
}
}
The archive.unpack
throw error after first getting chunk.
What am I doing wrong?
Solution 1:[1]
The comment by kmdreko explains why your code fails - .next()
returns only the first chunk, and you must feed all chunks to the gzip reader. The other answer shows how to do it using the blocking reqwest
API.
If you want to keep using the non-blocking API, then you can start the decoder in a separate thread and feed it data via a channel. For exampl, you can use flume channels which support both a sync and an async interface. You will also need to convert the channel into something that Read
, as expected by GzDecoder
. For example (compiles, but otherwise untested):
use std::io::{self, Read};
use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;
async fn download() -> io::Result<()> {
let client = reqwest::Client::new();
let full_url = "...";
let response;
match client.get(full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(io::Error::new(io::ErrorKind::InvalidData, error));
}
};
let (tx, rx) = flume::bounded(0);
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let gz = GzDecoder::new(input);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").unwrap();
});
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
tx.send_async(chunk.to_vec()).await.unwrap();
}
drop(tx); // close the channel to signal EOF
}
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();
Ok(())
}
// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
current: io::Cursor<Vec<u8>>,
}
impl ChannelRead {
fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
ChannelRead {
rx,
current: io::Cursor::new(vec![]),
}
}
}
impl Read for ChannelRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.current.position() == self.current.get_ref().len() as u64 {
// We've exhausted the previous chunk, get a new one.
if let Ok(vec) = self.rx.recv() {
self.current = io::Cursor::new(vec);
}
// If recv() "fails", it means the sender closed its part of
// the channel, which means EOF. Propagate EOF by allowing
// a read from the exhausted cursor.
}
self.current.read(buf)
}
}
Solution 2:[2]
Rust has a std::io::BufRead
trait. It manages an internal buffer and can be filled and consumed, which makes it great for passing data without intermediate collect
s.
As reqwest.Response
implements Read
, we can just make it into a BufReader
and pass it to flate2::bufread::GzDecoder
.
Your problem is that GzDecoder::new
expects everything passed to it to be a complete archive, but anything other than the first chunk obviously are not.
Minimal example (using the blocking API for simplicity):
use flate2::bufread::GzDecoder;
use std::io::BufReader;
use tar::Archive;
fn main() {
let resp = reqwest::blocking::get(URL).unwrap();
let content_br = BufReader::new(resp);
let tarfile = GzDecoder::new(content_br);
let mut archive = Archive::new(tarfile);
archive.unpack("./gamedata").unwrap();
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 |