added dumping of collected data

fuck_this_bullshit
Avril 3 years ago
parent 9c8707fc44
commit 9f4a0faa68
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
.gitignore vendored

@ -1,2 +1,3 @@
/target
*~
*.dump

87
Cargo.lock generated

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.14.1"
@ -15,6 +17,19 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e"
[[package]]
name = "async-compression"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537"
dependencies = [
"bzip2",
"futures-core",
"memchr",
"pin-project-lite 0.2.4",
"tokio",
]
[[package]]
name = "autocfg"
version = "1.0.1"
@ -47,6 +62,33 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bzip2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "275d84fe348b838dc49477d39770682839b3e73e21a3eadc07b12924f1a9fcbe"
dependencies = [
"bzip2-sys",
"libc",
]
[[package]]
name = "bzip2-sys"
version = "0.1.10+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17fa3d1ac1ca21c5c4e36a97f3c3eb25084576f6fc47bf0139c1123434216c6c"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "cc"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48"
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -76,11 +118,14 @@ dependencies = [
name = "dirstat"
version = "0.1.0"
dependencies = [
"async-compression",
"color-eyre",
"futures",
"lazy_static",
"num_cpus",
"pin-project",
"serde",
"serde_cbor",
"tokio",
]
@ -217,6 +262,12 @@ version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce"
[[package]]
name = "half"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3"
[[package]]
name = "hermit-abi"
version = "0.1.18"
@ -429,6 +480,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -465,6 +522,36 @@ version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232"
[[package]]
name = "serde"
version = "1.0.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92d5161132722baa40d802cc70b15262b98258453e85e5d1d365c757c73869ae"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9391c295d64fc0abb2c556bad848f33cb8296276b1ad2677d1ae1ace4f258f31"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "signal-hook-registry"
version = "1.3.0"

@ -7,15 +7,24 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["splash"]
default = ["splash", "inspect", "defer-drop"]
# Allow saving and loading of gathered data for later inspection
inspect = ["serde", "serde_cbor", "async-compression"]
# Enable dropping of certain large objects on background threads.
defer-drop = []
# Show splash screen
splash = []
[dependencies]
async-compression = {version = "0.3", features=["tokio-02", "bzip2"], optional=true}
color-eyre = {version = "0.5.10", default-features=false}
futures = "0.3.12"
lazy_static = "1.4.0"
num_cpus = "1.13.0"
pin-project = "1.0.5"
serde = {version = "1.0.123", features=["derive"], optional=true}
serde_cbor = {version = "0.11.1", optional=true}
tokio = {version = "0.2", features=["full"]}

@ -38,6 +38,8 @@ OPTIONS:
--threads <number> Limit the maximum number of tasks allowed to process concurrently (Set to 0 for unlimited.)
-M Set number of parallel running tasks to unlimited. (Same as `--threads 0`). (default).
-m Limit number of parallel tasks to the number of active CPU processors.
--save <file> Dump the collected data to this file for further inspection (only available when compiled with feature `inspect`)
-D Dump the collected data to `stdout` (see `--save`) (only available when compiled with feature `inspect`)
- Stop parsing arguments, treat all the rest as paths.
--help Print this message and exit.
@ -98,7 +100,14 @@ fn parse<I: IntoIterator<Item=String>>(args: I) -> eyre::Result<Mode>
"-m" => {
cfg.max_tasks = config::max_tasks_cpus();
},
#[cfg(feature="inspect")] "-D" => {
cfg.serialise_output = Some(None);
},
#[cfg(feature="inspect")] "--save" => {
let file = args.next().ok_or(eyre!("`--save` expects a filename parameter"))
.with_suggestion(suggestion_intended_arg.clone())?;
cfg.serialise_output = Some(Some(file.into()));
},
"--recursive" => {
let max = args.next().ok_or(eyre!("`--recursive` expects a parameter"))
.with_suggestion(suggestion_intended_arg.clone())?;

@ -53,6 +53,27 @@ pub struct Config
pub paths: Vec<PathBuf>,
pub recursive: Recursion,
pub max_tasks: Option<NonZeroUsize>,
#[cfg(feature="inspect")]
pub serialise_output: Option<Option<PathBuf>>, // Some(None) means dump to `stdout`
}
impl Config
{
/// Are we expected to dump data to `stdout`?
#[inline] pub fn is_using_stdout(&self) -> bool
{
#[cfg(feature="inspect")] {
return if let Some(None) = self.serialise_output {
true
} else {
false
};
}
#[cfg(not(feature="inspect"))] {
false
}
}
}
/// The default `max_tasks`
@ -73,6 +94,8 @@ impl Default for Config
paths: Vec::new(),
recursive: Default::default(),
max_tasks: None, //max_tasks_cpus(),
#[cfg(feature="inspect")]
serialise_output: None,
}
}
}

@ -80,6 +80,13 @@ impl Cache
/// If there are shared references to the `Cache` other than this one, the future will return this object as `Err`.
pub fn try_complete(self) -> impl futures::Future<Output=Result<HashMap<INode, FsInfo>, Self>> + 'static
{
#[inline(never)]
#[cold]
fn only_one() -> !
{
unreachable!("Arc should have only 1 strong reference now. This should never happend")
}
async move {
if Arc::strong_count(&self.0) > 2 {
return Err(self); // there is another other than the background task holding the shared cache reference
@ -101,7 +108,7 @@ impl Cache
Ok(inner) => {
Ok(inner.cache.into_inner())
},
#[cold] Err(_) => unreachable!("Arc should have only 1 strong reference now. This should never happend"),
Err(_) => only_one(),
}
}
}

@ -53,6 +53,7 @@ impl INodeInfoGraph
{
self.inodes.get(node.borrow())
}
/*
/// An iterator over top-level children of this node
pub fn children_of(&self, node: impl Borrow<INode>) -> !//Children<'_>
{
@ -66,7 +67,7 @@ impl INodeInfoGraph
pub fn directories(&self) -> !//Directories<'_>
{
todo!()//Directories(self, self.children.keys())
}
}*/
/// Convert into a hierarchical representation
pub fn into_hierarchical(self) -> HierarchicalINodeGraph
@ -82,6 +83,7 @@ impl INodeInfoGraph
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))]
enum NodeKind
{
Directory(Vec<PathBuf>, Option<u64>),
@ -100,6 +102,7 @@ impl NodeKind
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))]
struct HierarchicalNode
{
kind: NodeKind,
@ -128,6 +131,7 @@ impl HierarchicalNode
/// A hierarchical graph of node sizes
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))]
pub struct HierarchicalINodeGraph
{
table: HashMap<PathBuf, HierarchicalNode>

@ -25,6 +25,7 @@ pub use graph::{
/// Ususally created from the `.inode()` extension method on `fs::Metadata` found in prelude.
/// Can also be created with `new()` from a `fs::Metadata` reference, or created unsafely from an arbitrary `u64` with `new_unchecked`.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Copy)]
#[cfg_attr(feature="inspect", derive(serde::Serialize, serde::Deserialize))]
#[repr(transparent)]
pub struct INode(u64);

@ -0,0 +1,39 @@
//! Mechanism to defer dropping of large objects to background threads
use futures::{
prelude::*,
future::OptionFuture,
};
pub const DEFER_DROP_VEC_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
/// Drop a `Vec<T>` that is `Send` and `'static`.
///
/// This will move the object to a background task if it is deemed nessisary.
/// # Note
/// This *must* be `await`ed to work properly. If you are not in an async context, use `drop_vec_sync`.
pub fn drop_vec<T>(vec: Vec<T>) -> impl Future<Output = ()> + 'static
where T: Send + 'static
{
let len_bytes = vec.len() * std::mem::size_of::<T>();
OptionFuture::from(if len_bytes > DEFER_DROP_VEC_SIZE_FLOOR {
eprintln!("Size of vector ({} bytes, {} elements of {:?}) exceeds defer drop size floor {}. Moving vector to a seperate thread for de-allocation", len_bytes, vec.len(), std::any::type_name::<T>(), DEFER_DROP_VEC_SIZE_FLOOR);
Some(async move {
tokio::task::spawn_blocking(move || drop(vec)).await.expect("Child panic while dropping vector");
})
} else {
None
}).map(|_| ())
}
/// Drop a `Vec<T>` that is `Send` and `'static`.
///
/// This will move the object to a background task if it is deemed nessisary.
pub fn drop_vec_sync<T>(vec: Vec<T>)
where T: Send + 'static
{
let len_bytes = vec.len() * std::mem::size_of::<T>();
if len_bytes > DEFER_DROP_VEC_SIZE_FLOOR {
eprintln!("Size of vector ({} bytes, {} elements of {:?}) exceeds defer drop size floor {}. Moving vector to a seperate thread for de-allocation", len_bytes, vec.len(), std::any::type_name::<T>(), DEFER_DROP_VEC_SIZE_FLOOR);
tokio::task::spawn_blocking(move || drop(vec));
}
}

@ -282,3 +282,39 @@ mod tests
assert_eq!((0..100).sum::<i32>(),sum);
}
}
/// Explicitly drop this item.
///
/// If `defer-drop` feature is enabled, this may move the object to the background collector thread.
///
/// # Speicialisations
/// There can be special handling for `Vec<T>` types in this way.
/// ```
/// let large_vec = vec![String::from("Hello world"); 1000];
/// drop!(vec large_vec);
/// ```
/// It also has an `async` variant, that lets you await the background dropping task.
/// ```
/// let large_vec = vec![String::from("Hello world"); 1000];
/// drop!(async vec large_vec);
/// ```
#[macro_export] macro_rules! drop {
(async vec $item:expr) => {
#[cfg(feature="defer-drop")] {
$crate::defer_drop::drop_vec($item).await;
}
#[cfg(not(feature="defer-drop"))] {
drop($item);
}
()
};
(vec $item:expr) => {
#[cfg(feature="defer-drop")] {
$crate::defer_drop::drop_vec_sync($item);
}
#[cfg(not(feature="defer-drop"))] {
drop($item);
}
()
}
}

@ -4,6 +4,8 @@
#[macro_use] extern crate pin_project;
#[macro_use] extern crate lazy_static;
#[cfg(feature="inspect")] use serde::{Serialize, Deserialize};
use color_eyre::{
eyre::{
self,
@ -24,44 +26,92 @@ mod config;
mod state;
mod arg;
mod work;
#[cfg(feature="inspect")] mod serial;
#[cfg(feature="defer-drop")] mod defer_drop;
async fn normal(cfg: config::Config) -> eyre::Result<()>
{
let state = state::State::new(cfg
.validate()
.wrap_err(eyre!("Invalid config"))
.with_suggestion(|| "Try running `--help`")?);
let (graph, cfg) = tokio::select!{
x = work::work_on_all(state) => {x}
_ = tokio::signal::ctrl_c() => {
return Err(eyre!("Interrupt signalled, exiting"));
}
};
let graph = tokio::task::spawn_blocking(move || {
let mut graph = graph.into_hierarchical();
graph.compute_recursive_sizes();
graph
}).await.expect("Failed to compute hierarchy from graph");
#[cfg(debug_assertions)] eprintln!("{:?}", graph);
if cfg.is_using_stdout() {
eprintln!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File));
eprintln!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory));
eprintln!("Max size all: {:?}", graph.path_max_size());
} else {
println!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File));
println!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory));
println!("Max size all: {:?}", graph.path_max_size());
}
#[cfg(feature="inspect")]
match cfg.serialise_output.as_ref().map(|ser_out| {
type BoxedWrite = Box<dyn tokio::io::AsyncWrite + Unpin>;
use futures::FutureExt;
match ser_out {
Some(output_file) => {
use tokio::fs::OpenOptions;
async move {
let stream = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(output_file).await
.wrap_err(eyre!("Failed to open file for writing"))
.with_section(|| format!("{:?}", output_file).header("File was"))?;
Ok::<BoxedWrite, eyre::Report>(Box::new(stream))
}.boxed()
},
None => async move { Ok::<BoxedWrite, _>(Box::new(tokio::io::stdout())) }.boxed(),
}
}) {
Some(stream_fut) => {
let stream = stream_fut.await?;
serial::write_async(stream, &graph).await
.wrap_err(eyre!("Failed to serialise graph to stream"))?;
},
_ => (),
}
Ok(())
}
async fn read_config() -> eyre::Result<config::Config>
async fn parse_mode() -> eyre::Result<()>
{
match arg::parse_args().wrap_err(eyre!("Failed to parse args"))?
{
arg::Mode::Normal(cfg) => {
#[cfg(debug_assertions)] eprintln!("Parsed config: {:#?}\n", cfg);
Ok(cfg)
normal(cfg).await
},
arg::Mode::Help => arg::help(),
}
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
color_eyre::install()?;
let state = state::State::new(read_config().await
.wrap_err(eyre!("Failed to load config"))?
.validate()
.wrap_err(eyre!("Invalid config"))
.with_suggestion(|| "Try running `--help`")?);
let graph = tokio::select!{
x = work::work_on_all(state) => {x}
_ = tokio::signal::ctrl_c() => {
return Err(eyre!("Interrupt signalled, exiting"));
}
};
let mut graph = graph.into_hierarchical();
graph.compute_recursive_sizes();
println!("{:?}", graph);
parse_mode().await
.wrap_err(eyre!("Fatal error"))?;
println!("Max size file: {:?}", graph.path_max_size_for(data::FsKind::File));
println!("Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory));
println!("Max size all: {:?}", graph.path_max_size());
/* let max_size = graph.directories().map(|x| x.size()).max();
/* let max_size = graph.directories().map(|x| x.size()).max();
println!("Max size: {:?}", max_size);*/
//println!("{:?}", graph);

@ -0,0 +1,59 @@
//! For serializing
use super::*;
use tokio::prelude::*;
use async_compression::tokio_02::write::{
BzEncoder,
BzDecoder,
};
type Compressor<T> = BzEncoder<T>;
type Decompressor<T> = BzDecoder<T>;
const DEFER_DROP_SIZE_FLOOR: usize = 1024 * 1024; // 1 MB
/// Serialise this object asynchronously
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T) -> eyre::Result<()>
where W: AsyncWrite + Unpin
{
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was");
let vec = tokio::task::block_in_place(|| serde_cbor::to_vec(item))
.wrap_err(eyre!("Failed to serialise item"))
.with_section(sect_stream_type_name.clone())
.with_section(sect_type_name.clone())?;
{
let mut stream = Compressor::new(&mut to);
eprintln!("Writing {} bytes of type {:?} to stream of type {:?}", vec.len(), std::any::type_name::<T>(), std::any::type_name::<W>());
stream.write_all(&vec[..])
.await
.wrap_err(eyre!("Failed to write serialised memory to stream"))
.with_section(|| vec.len().to_string().header("Size of the serialised object was"))
.with_section(sect_stream_type_name.clone())
.with_section(sect_type_name.clone())?;
stream.flush().await.wrap_err(eyre!("Failed to flush output compression stream"))?;
stream.shutdown().await.wrap_err(eyre!("Failed to shutdown output compression stream"))?;
}
// Extremely overcomplicated concurrent flush+drop.
use futures::FutureExt;
let flush_fut = async {
to.flush().await.wrap_err(eyre!("Failed to flush output backing stream"))?;
to.shutdown().await.wrap_err(eyre!("Failed to shutdown output backing stream"))?;
Ok::<(), eyre::Report>(())
}.fuse();
tokio::pin!(flush_fut);
tokio::select!{
res = &mut flush_fut => {
return res;
}
_ = async move { drop!(async vec vec); } => {}
}
flush_fut.await
}

@ -93,11 +93,11 @@ impl State
/// Try to consume the state into the cache.
///
/// Fails if there are other references of this state alive.
pub fn try_into_cache(self) -> Result<Cache, Self>
pub fn try_into_cache(self) -> Result<(Cache, Config), Self>
{
match Arc::try_unwrap(self.config)
{
Ok(_) => Ok(self.cache),
Ok(cfg) => Ok((self.cache, cfg)),
Err(config) => Err(Self{config, ..self}),
}
}

@ -14,6 +14,7 @@ use data::INode;
use data::FsInfo;
use state::State;
use data::INodeInfoGraph;
use config::Config;
async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result<FsInfo>
{
@ -34,7 +35,7 @@ async fn process_entry(entry: &tokio::fs::DirEntry, parent: INode) -> io::Result
///
/// # Panics
/// If there are any more held references to `state`.
pub async fn work_on_all(state: State) -> INodeInfoGraph
pub async fn work_on_all(state: State) -> (INodeInfoGraph, Config)
{
let comp_children = join_all(state.config().paths.iter().map(|path| {
let path = path.clone();
@ -54,9 +55,9 @@ pub async fn work_on_all(state: State) -> INodeInfoGraph
})).await;
// All children have completed here. Unwrap cache
let ino_map = {
let cache = state.try_into_cache().unwrap();
cache.try_complete().await.unwrap()
let (ino_map, cfg) = {
let (cache, cfg) = state.try_into_cache().unwrap();
(cache.try_complete().await.unwrap(), cfg)
};
let mut output = HashMap::with_capacity(ino_map.len());
@ -65,14 +66,13 @@ pub async fn work_on_all(state: State) -> INodeInfoGraph
if let Some(res) = path_comp
{
output.extend(res);
}
}
INodeInfoGraph::new(
(INodeInfoGraph::new(
ino_map,
output,
)
), cfg)
}
/// Walk this directory.
@ -88,7 +88,11 @@ fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, Hash
async move {
{
let _guard = state.lock().enter().await;
println!(" -> {:?}", root);
if state.config().is_using_stdout() {
eprintln!(" -> {:?}", root);
} else {
println!(" -> {:?}", root);
}
match fs::read_dir(&root).await
{
Ok(mut dir) => {

Loading…
Cancel
Save