From 9f4a0faa683570504ec449b5b0b4f6e028af8c96 Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 16 Feb 2021 21:08:55 +0000 Subject: [PATCH] added dumping of collected data --- .gitignore | 1 + Cargo.lock | 87 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 11 +++++- src/arg.rs | 11 +++++- src/config.rs | 23 +++++++++++ src/data/cache.rs | 9 ++++- src/data/graph.rs | 6 ++- src/data/mod.rs | 1 + src/defer_drop.rs | 39 +++++++++++++++++++ src/ext.rs | 36 +++++++++++++++++ src/main.rs | 98 +++++++++++++++++++++++++++++++++++------------ src/serial.rs | 59 ++++++++++++++++++++++++++++ src/state.rs | 4 +- src/work.rs | 20 ++++++---- 14 files changed, 367 insertions(+), 38 deletions(-) create mode 100644 src/defer_drop.rs create mode 100644 src/serial.rs diff --git a/.gitignore b/.gitignore index e2a3069..5b36c65 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target *~ +*.dump diff --git a/Cargo.lock b/Cargo.lock index bd5ed05..e2c0ecf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "addr2line" version = "0.14.1" @@ -15,6 +17,19 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" +[[package]] +name = "async-compression" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537" +dependencies = [ + "bzip2", + "futures-core", + "memchr", + "pin-project-lite 0.2.4", + "tokio", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -47,6 +62,33 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "bzip2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "275d84fe348b838dc49477d39770682839b3e73e21a3eadc07b12924f1a9fcbe" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.10+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17fa3d1ac1ca21c5c4e36a97f3c3eb25084576f6fc47bf0139c1123434216c6c" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + +[[package]] +name = "cc" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" + [[package]] name = "cfg-if" version = "0.1.10" @@ -76,11 +118,14 @@ dependencies = [ name = "dirstat" version = "0.1.0" dependencies = [ + "async-compression", "color-eyre", "futures", "lazy_static", "num_cpus", "pin-project", + "serde", + "serde_cbor", "tokio", ] @@ -217,6 +262,12 @@ version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce" +[[package]] +name = "half" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" + [[package]] name = "hermit-abi" version = "0.1.18" @@ -429,6 +480,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -465,6 +522,36 @@ version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232" +[[package]] +name = "serde" +version = "1.0.123" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92d5161132722baa40d802cc70b15262b98258453e85e5d1d365c757c73869ae" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.123" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9391c295d64fc0abb2c556bad848f33cb8296276b1ad2677d1ae1ace4f258f31" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "signal-hook-registry" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index 6eb69ad..da9653a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,24 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["splash"] +default = ["splash", "inspect", "defer-drop"] + +# Allow saving and loading of gathered data for later inspection +inspect = ["serde", "serde_cbor", "async-compression"] + +# Enable dropping of certain large objects on background threads. +defer-drop = [] # Show splash screen splash = [] [dependencies] +async-compression = {version = "0.3", features=["tokio-02", "bzip2"], optional=true} color-eyre = {version = "0.5.10", default-features=false} futures = "0.3.12" lazy_static = "1.4.0" num_cpus = "1.13.0" pin-project = "1.0.5" +serde = {version = "1.0.123", features=["derive"], optional=true} +serde_cbor = {version = "0.11.1", optional=true} tokio = {version = "0.2", features=["full"]} diff --git a/src/arg.rs b/src/arg.rs index 8267fc0..31400f0 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -38,6 +38,8 @@ OPTIONS: --threads Limit the maximum number of tasks allowed to process concurrently (Set to 0 for unlimited.) -M Set number of parallel running tasks to unlimited. (Same as `--threads 0`). (default). -m Limit number of parallel tasks to the number of active CPU processors. + --save Dump the collected data to this file for further inspection (only available when compiled with feature `inspect`) + -D Dump the collected data to `stdout` (see `--save`) (only available when compiled with feature `inspect`) - Stop parsing arguments, treat all the rest as paths. --help Print this message and exit. @@ -98,7 +100,14 @@ fn parse>(args: I) -> eyre::Result "-m" => { cfg.max_tasks = config::max_tasks_cpus(); }, - + #[cfg(feature="inspect")] "-D" => { + cfg.serialise_output = Some(None); + }, + #[cfg(feature="inspect")] "--save" => { + let file = args.next().ok_or(eyre!("`--save` expects a filename parameter")) + .with_suggestion(suggestion_intended_arg.clone())?; + cfg.serialise_output = Some(Some(file.into())); + }, "--recursive" => { let max = args.next().ok_or(eyre!("`--recursive` expects a parameter")) .with_suggestion(suggestion_intended_arg.clone())?; diff --git a/src/config.rs b/src/config.rs index ae0b5a0..ece5ea0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,6 +53,27 @@ pub struct Config pub paths: Vec, pub recursive: Recursion, pub max_tasks: Option, + + #[cfg(feature="inspect")] + pub serialise_output: Option>, // Some(None) means dump to `stdout` +} + +impl Config +{ + /// Are we expected to dump data to `stdout`? + #[inline] pub fn is_using_stdout(&self) -> bool + { + #[cfg(feature="inspect")] { + return if let Some(None) = self.serialise_output { + true + } else { + false + }; + } + #[cfg(not(feature="inspect"))] { + false + } + } } /// The default `max_tasks` @@ -73,6 +94,8 @@ impl Default for Config paths: Vec::new(), recursive: Default::default(), max_tasks: None, //max_tasks_cpus(), + #[cfg(feature="inspect")] + serialise_output: None, } } } diff --git a/src/data/cache.rs b/src/data/cache.rs index 9fbeb9e..c71d4c2 100644 --- a/src/data/cache.rs +++ b/src/data/cache.rs @@ -80,6 +80,13 @@ impl Cache /// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`. pub fn try_complete(self) -> impl futures::Future, Self>> + 'static { + #[inline(never)] + #[cold] + fn only_one() -> ! + { + unreachable!("Arc should have only 1 strong reference now. This should never happend") + } + async move { if Arc::strong_count(&self.0) > 2 { return Err(self); // there is another other than the background task holding the shared cache reference @@ -101,7 +108,7 @@ impl Cache Ok(inner) => { Ok(inner.cache.into_inner()) }, - #[cold] Err(_) => unreachable!("Arc should have only 1 strong reference now. This should never happend"), + Err(_) => only_one(), } } } diff --git a/src/data/graph.rs b/src/data/graph.rs index 87f9eee..e2dc052 100644 --- a/src/data/graph.rs +++ b/src/data/graph.rs @@ -53,6 +53,7 @@ impl INodeInfoGraph { self.inodes.get(node.borrow()) } + /* /// An iterator over top-level children of this node pub fn children_of(&self, node: impl Borrow) -> !//Children<'_> { @@ -66,7 +67,7 @@ impl INodeInfoGraph pub fn directories(&self) -> !//Directories<'_> { todo!()//Directories(self, self.children.keys()) - } + }*/ /// Convert into a hierarchical representation pub fn into_hierarchical(self) -> HierarchicalINodeGraph @@ -82,6 +83,7 @@ impl INodeInfoGraph } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))] enum NodeKind { Directory(Vec, Option), @@ -100,6 +102,7 @@ impl NodeKind } #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))] struct HierarchicalNode { kind: NodeKind, @@ -128,6 +131,7 @@ impl HierarchicalNode /// A hierarchical graph of node sizes #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))] pub struct HierarchicalINodeGraph { table: HashMap diff --git a/src/data/mod.rs b/src/data/mod.rs index 58587f1..8aabf34 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -25,6 +25,7 @@ pub use graph::{ /// Ususally created from the `.inode()` extension method on `fs::Metadata` found in prelude. /// Can also be created with `new()` from a `fs::Metadata` reference, or created unsafely from an arbitrary `u64` with `new_unchecked`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)] +#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))] #[repr(transparent)] pub struct INode(u64); diff --git a/src/defer_drop.rs b/src/defer_drop.rs new file mode 100644 index 0000000..ed46f5a --- /dev/null +++ b/src/defer_drop.rs @@ -0,0 +1,39 @@ +//! Mechanism to defer dropping of large objects to background threads +use futures::{ + prelude::*, + future::OptionFuture, +}; + +pub const DEFER_DROP_VEC_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB + +/// Drop a `Vec` that is `Send` and `'static`. +/// +/// This will move the object to a background task if it is deemed nessisary. +/// # Note +/// This *must* be `await`ed to work properly. If you are not in an async context, use `drop_vec_sync`. +pub fn drop_vec(vec: Vec) -> impl Future + 'static +where T: Send + 'static +{ + let len_bytes = vec.len() * std::mem::size_of::(); + OptionFuture::from(if len_bytes > DEFER_DROP_VEC_SIZE_FLOOR { + eprintln!("Size of vector ({} bytes, {} elements of {:?}) exceeds defer drop size floor {}. Moving vector to a seperate thread for de-allocation", len_bytes, vec.len(), std::any::type_name::(), DEFER_DROP_VEC_SIZE_FLOOR); + Some(async move { + tokio::task::spawn_blocking(move || drop(vec)).await.expect("Child panic while dropping vector"); + }) + } else { + None + }).map(|_| ()) +} + +/// Drop a `Vec` that is `Send` and `'static`. +/// +/// This will move the object to a background task if it is deemed nessisary. +pub fn drop_vec_sync(vec: Vec) +where T: Send + 'static +{ + let len_bytes = vec.len() * std::mem::size_of::(); + if len_bytes > DEFER_DROP_VEC_SIZE_FLOOR { + eprintln!("Size of vector ({} bytes, {} elements of {:?}) exceeds defer drop size floor {}. Moving vector to a seperate thread for de-allocation", len_bytes, vec.len(), std::any::type_name::(), DEFER_DROP_VEC_SIZE_FLOOR); + tokio::task::spawn_blocking(move || drop(vec)); + } +} diff --git a/src/ext.rs b/src/ext.rs index 4ecdd30..862f118 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -282,3 +282,39 @@ mod tests assert_eq!((0..100).sum::(),sum); } } + +/// Explicitly drop this item. +/// +/// If `defer-drop` feature is enabled, this may move the object to the background collector thread. +/// +/// # Speicialisations +/// There can be special handling for `Vec` types in this way. +/// ``` +/// let large_vec = vec![String::from("Hello world"); 1000]; +/// drop!(vec large_vec); +/// ``` +/// It also has an `async` variant, that lets you await the background dropping task. +/// ``` +/// let large_vec = vec![String::from("Hello world"); 1000]; +/// drop!(async vec large_vec); +/// ``` +#[macro_export] macro_rules! drop { + (async vec $item:expr) => { + #[cfg(feature="defer-drop")] { + $crate::defer_drop::drop_vec($item).await; + } + #[cfg(not(feature="defer-drop"))] { + drop($item); + } + () + }; + (vec $item:expr) => { + #[cfg(feature="defer-drop")] { + $crate::defer_drop::drop_vec_sync($item); + } + #[cfg(not(feature="defer-drop"))] { + drop($item); + } + () + } +} diff --git a/src/main.rs b/src/main.rs index 4de02cc..9becbe9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ #[macro_use] extern crate pin_project; #[macro_use] extern crate lazy_static; +#[cfg(feature="inspect")] use serde::{Serialize, Deserialize}; + use color_eyre::{ eyre::{ self, @@ -24,44 +26,92 @@ mod config; mod state; mod arg; mod work; +#[cfg(feature="inspect")] mod serial; +#[cfg(feature="defer-drop")] mod defer_drop; + +async fn normal(cfg: config::Config) -> eyre::Result<()> +{ + let state = state::State::new(cfg + .validate() + .wrap_err(eyre!("Invalid config")) + .with_suggestion(|| "Try running `--help`")?); + + let (graph, cfg) = tokio::select!{ + x = work::work_on_all(state) => {x} + _ = tokio::signal::ctrl_c() => { + return Err(eyre!("Interrupt signalled, exiting")); + } + }; + let graph = tokio::task::spawn_blocking(move || { + let mut graph = graph.into_hierarchical(); + graph.compute_recursive_sizes(); + graph + }).await.expect("Failed to compute hierarchy from graph"); + + #[cfg(debug_assertions)] eprintln!("{:?}", graph); + + if cfg.is_using_stdout() { + eprintln!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File)); + eprintln!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory)); + eprintln!("Max size all: {:?}", graph.path_max_size()); + } else { + println!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File)); + println!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory)); + println!("Max size all: {:?}", graph.path_max_size()); + } + + #[cfg(feature="inspect")] + match cfg.serialise_output.as_ref().map(|ser_out| { + type BoxedWrite = Box; + use futures::FutureExt; + match ser_out { + Some(output_file) => { + use tokio::fs::OpenOptions; + async move { + let stream = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(output_file).await + .wrap_err(eyre!("Failed to open file for writing")) + .with_section(|| format!("{:?}", output_file).header("File was"))?; + + Ok::(Box::new(stream)) + }.boxed() + }, + None => async move { Ok::(Box::new(tokio::io::stdout())) }.boxed(), + } + }) { + Some(stream_fut) => { + let stream = stream_fut.await?; + serial::write_async(stream, &graph).await + .wrap_err(eyre!("Failed to serialise graph to stream"))?; + }, + _ => (), + } + + Ok(()) +} -async fn read_config() -> eyre::Result +async fn parse_mode() -> eyre::Result<()> { match arg::parse_args().wrap_err(eyre!("Failed to parse args"))? { arg::Mode::Normal(cfg) => { - #[cfg(debug_assertions)] eprintln!("Parsed config: {:#?}\n", cfg); - Ok(cfg) + normal(cfg).await }, arg::Mode::Help => arg::help(), - } + } } #[tokio::main] async fn main() -> eyre::Result<()> { color_eyre::install()?; - - let state = state::State::new(read_config().await - .wrap_err(eyre!("Failed to load config"))? - .validate() - .wrap_err(eyre!("Invalid config")) - .with_suggestion(|| "Try running `--help`")?); - - let graph = tokio::select!{ - x = work::work_on_all(state) => {x} - _ = tokio::signal::ctrl_c() => { - return Err(eyre!("Interrupt signalled, exiting")); - } - }; - let mut graph = graph.into_hierarchical(); - graph.compute_recursive_sizes(); - println!("{:?}", graph); + parse_mode().await + .wrap_err(eyre!("Fatal error"))?; - println!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File)); - println!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory)); - println!("Max size all: {:?}", graph.path_max_size()); -/* let max_size = graph.directories().map(|x| x.size()).max(); + /* let max_size = graph.directories().map(|x| x.size()).max(); println!("Max size: {:?}", max_size);*/ //println!("{:?}", graph); diff --git a/src/serial.rs b/src/serial.rs new file mode 100644 index 0000000..645653d --- /dev/null +++ b/src/serial.rs @@ -0,0 +1,59 @@ +//! For serializing +use super::*; +use tokio::prelude::*; + +use async_compression::tokio_02::write::{ + BzEncoder, + BzDecoder, +}; + +type Compressor = BzEncoder; +type Decompressor = BzDecoder; + +const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB + +/// Serialise this object asynchronously +pub async fn write_async(mut to: W, item: &T) -> eyre::Result<()> +where W: AsyncWrite + Unpin +{ + let sect_type_name = || std::any::type_name::().header("Type trying to serialise was"); + let sect_stream_type_name = || std::any::type_name::().header("Stream type was"); + + let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item)) + .wrap_err(eyre!("Failed to serialise item")) + .with_section(sect_stream_type_name.clone()) + .with_section(sect_type_name.clone())?; + + { + let mut stream = Compressor::new(&mut to); + + eprintln!("Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::(), std::any::type_name::()); + + stream.write_all(&vec[..]) + .await + .wrap_err(eyre!("Failed to write serialised memory to stream")) + .with_section(|| vec.len().to_string().header("Size of the serialised object was")) + .with_section(sect_stream_type_name.clone()) + .with_section(sect_type_name.clone())?; + + stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?; + stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?; + } + + // Extremely overcomplicated concurrent flush+drop. + use futures::FutureExt; + let flush_fut = async { + to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?; + to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?; + Ok::<(), eyre::Report>(()) + }.fuse(); + + tokio::pin!(flush_fut); + tokio::select!{ + res = &mut flush_fut => { + return res; + } + _ = async move { drop!(async vec vec); } => {} + } + flush_fut.await +} diff --git a/src/state.rs b/src/state.rs index a42a88b..d33aa03 100644 --- a/src/state.rs +++ b/src/state.rs @@ -93,11 +93,11 @@ impl State /// Try to consume the state into the cache. /// /// Fails if there are other references of this state alive. - pub fn try_into_cache(self) -> Result + pub fn try_into_cache(self) -> Result<(Cache, Config), Self> { match Arc::try_unwrap(self.config) { - Ok(_) => Ok(self.cache), + Ok(cfg) => Ok((self.cache, cfg)), Err(config) => Err(Self{config, ..self}), } } diff --git a/src/work.rs b/src/work.rs index a789090..f36125b 100644 --- a/src/work.rs +++ b/src/work.rs @@ -14,6 +14,7 @@ use data::INode; use data::FsInfo; use state::State; use data::INodeInfoGraph; +use config::Config; async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result { @@ -34,7 +35,7 @@ async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result /// /// # Panics /// If there are any more held references to `state`. -pub async fn work_on_all(state: State) -> INodeInfoGraph +pub async fn work_on_all(state: State) -> (INodeInfoGraph, Config) { let comp_children = join_all(state.config().paths.iter().map(|path| { let path = path.clone(); @@ -54,9 +55,9 @@ pub async fn work_on_all(state: State) -> INodeInfoGraph })).await; // All children have completed here. Unwrap cache - let ino_map = { - let cache = state.try_into_cache().unwrap(); - cache.try_complete().await.unwrap() + let (ino_map, cfg) = { + let (cache, cfg) = state.try_into_cache().unwrap(); + (cache.try_complete().await.unwrap(), cfg) }; let mut output = HashMap::with_capacity(ino_map.len()); @@ -65,14 +66,13 @@ pub async fn work_on_all(state: State) -> INodeInfoGraph if let Some(res) = path_comp { output.extend(res); - } } - INodeInfoGraph::new( + (INodeInfoGraph::new( ino_map, output, - ) + ), cfg) } /// Walk this directory. @@ -88,7 +88,11 @@ fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, Hash async move { { let _guard = state.lock().enter().await; - println!(" -> {:?}", root); + if state.config().is_using_stdout() { + eprintln!(" -> {:?}", root); + } else { + println!(" -> {:?}", root); + } match fs::read_dir(&root).await { Ok(mut dir) => {