From 0f8bec6e03d67b289d36912eb4be642aa2277e29 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 25 Jul 2020 09:17:06 +0100 Subject: [PATCH] cache is now compressed --- Cargo.lock | 24 +++++++++++++++++++++++ Cargo.toml | 7 ++++--- src/container.rs | 51 +++++++++++++++++++++++++++++++++++++++--------- src/main.rs | 10 +++++++++- 4 files changed, 79 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8ef6e4..e6595fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,9 @@ name = "cc" version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9a06fb2e53271d7c279ec1efea6ab691c35a2ae67ec0d91d7acec0caf13b518" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -310,6 +313,15 @@ dependencies = [ "libc", ] +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -352,6 +364,17 @@ dependencies = [ "scoped-tls", ] +[[package]] +name = "lzzzz" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "502f79acb09d64a2fe638e057dacade375e3f2197587b97e0397fb95dee60aaa" +dependencies = [ + "cc", + "pin-project", + "tokio", +] + [[package]] name = "memchr" version = "2.3.3" @@ -560,6 +583,7 @@ dependencies = [ "chrono", "futures", "lazy_static", + "lzzzz", "sha2", "shellexpand", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 2ef388c..9d1e8c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmdupe" -version = "1.2.1" +version = "2.0.0" authors = ["Avril "] edition = "2018" @@ -12,7 +12,7 @@ lto = "fat" codegen-units = 1 [features] -threads = ["tokio", "futures"] +threads = ["tokio", "futures", "lzzzz/tokio-io"] [dev-dependencies] tokio-test = "0.2" @@ -24,4 +24,5 @@ futures = { version = "0.3", optional = true } lazy_static = "1.4" chrono = "0.4" shellexpand = "1.1" -# cfg_if = "0.1" \ No newline at end of file +lzzzz = "0.2" +# cfg_if = "0.1" diff --git a/src/container.rs b/src/container.rs index 11490a5..3d35bb7 100644 --- a/src/container.rs +++ b/src/container.rs @@ -180,6 +180,14 @@ impl DupeMap /// Save this list to a file pub fn save(&self, to: &mut W) -> io::Result { + use lzzzz::{ + lz4f::{ + PreferencesBuilder, + WriteCompressor, + CLEVEL_MAX, + }, + }; + let mut to = WriteCompressor::new(to, PreferencesBuilder::new().compression_level(CLEVEL_MAX).build())?; let mut done=0; for (path, (hash, _)) in self.table.iter() { @@ -201,18 +209,29 @@ impl DupeMap { use tokio::prelude::*; - let mut done=0; + let mut done=0usize; + + use lzzzz::{ + lz4f::{ + PreferencesBuilder, + AsyncWriteCompressor, + CLEVEL_MAX, + }, + }; + let mut to = AsyncWriteCompressor::new(to, PreferencesBuilder::new().compression_level(CLEVEL_MAX).build())?; for (path, (hash, _)) in self.table.iter() { let path = path_bytes(path.as_ref()); let hash: &[u8] = hash.as_ref(); - to.write(ENTRY_HEADER).await?; - to.write(bytes::reinterpret(&path.len())).await?; - to.write(path).await?; - to.write(hash).await?; + to.write_all(ENTRY_HEADER).await?; + to.write_all(bytes::reinterpret(&path.len())).await?; ////ASD OASDI AJOSID OAISNDO I + to.write_all(path).await?; + to.write_all(hash).await?; done+=1; } + to.flush().await?; + to.shutdown().await?; Ok(done) } @@ -223,7 +242,9 @@ impl DupeMap let mut read; let mut header_buffer = [0u8; ENTRY_HEADER.len() + std::mem::size_of::()]; let mut hash_buffer = [0u8; hash::SHA256_SIZE]; - + let mut from = lzzzz::lz4f::ReadDecompressor::new(from)?; + + //XXX: Change to read_exact while {read = from.read(&mut header_buffer[..])?; read == header_buffer.len() && &header_buffer[..ENTRY_HEADER.len()] == ENTRY_HEADER} { let sz = *bytes::reinterpret_back(&header_buffer[ENTRY_HEADER.len()..]); @@ -258,14 +279,26 @@ impl DupeMap let mut header_buffer = [0u8; ENTRY_HEADER.len() + std::mem::size_of::()]; let mut hash_buffer = [0u8; hash::SHA256_SIZE]; - while {read = from.read(&mut header_buffer[..]).await?; read == header_buffer.len() && &header_buffer[..ENTRY_HEADER.len()] == ENTRY_HEADER} + let mut from = lzzzz::lz4f::AsyncReadDecompressor::new(from)?; + while {read = match from.read_exact(&mut header_buffer[..]).await { + Ok(v) => Ok(v), + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + if let Some(re) = e.get_ref() { + if format!("{}", re) == "early eof" { // Is there a better way to compare these? `Any` trait? Is it worth it? Don't care, it's an error anyway. + return Ok(done); // This is fine + } + } + Err(e) + }, + v => v, + }?; read == header_buffer.len() && &header_buffer[..ENTRY_HEADER.len()] == ENTRY_HEADER} { let sz = *bytes::reinterpret_back(&header_buffer[ENTRY_HEADER.len()..]); if sz > 0 { let mut path = vec![0u8; sz]; - if from.read(&mut path[..]).await? == sz { + if from.read_exact(&mut path[..]).await? == sz { let path = bytes_path(&path[..]); - if from.read(&mut hash_buffer[..]).await? == hash::SHA256_SIZE + if from.read_exact(&mut hash_buffer[..]).await? == hash::SHA256_SIZE { if !trans && self.cache(path, hash::Sha256Hash::new(hash_buffer)) { done +=1; diff --git a/src/main.rs b/src/main.rs index 2b5f25e..6542624 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,8 @@ mod config; mod arg; mod proc; +//#[cfg(feature="threads")] +//mod pipe; //No longer needed #[cfg(test)] mod test { @@ -199,7 +201,10 @@ async fn rebase(config: config::Rebase) -> Result<(), Box Err(e) => println!("Warning: Failed to write to output {:?}: ignoring: {}", file, e), _ => (), }; - + match file.sync_data().await { + Err(e) => println!("Warning: Failed to sync output {:?}: ignoring: {}", file, e), + _ => (), + } } Ok(()) @@ -315,6 +320,9 @@ async fn main() -> Result<(), Box> .open(save).await.log_and_forget(lmode, log::Level::Warning)? { args.mode.error_mode.handle(hashes.save_async(&mut file).await).log_and_forget(lmode, log::Level::Warning)?; + use tokio::prelude::*; + file.shutdown().await.log_and_forget(lmode, log::Level::Warning)?; //One of these is probably redundant. + file.sync_data().await.log_and_forget(lmode, log::Level::Warning)?; } } },