move write_graph to Arced background task

arg-parsing-better
Avril 4 years ago
parent 0c58a9d44b
commit 0ca0146739
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
Cargo.lock generated

@ -119,6 +119,7 @@ name = "dirstat"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"cfg-if 1.0.0",
"color-eyre", "color-eyre",
"futures", "futures",
"jemallocator", "jemallocator",

@ -33,6 +33,7 @@ splash = []
[dependencies] [dependencies]
async-compression = {version = "0.3", features=["tokio-02", "bzip2"], optional=true} async-compression = {version = "0.3", features=["tokio-02", "bzip2"], optional=true}
cfg-if = "1.0.0"
color-eyre = {version = "0.5.10", default-features=false} color-eyre = {version = "0.5.10", default-features=false}
futures = "0.3.12" futures = "0.3.12"
jemallocator = {version = "0.3.2", optional = true} jemallocator = {version = "0.3.2", optional = true}

@ -38,6 +38,14 @@ const OPTIONS_NORMAL: &'static [&'static str] = &[
"- Stop parsing arguments, treat all the rest as paths.", "- Stop parsing arguments, treat all the rest as paths.",
]; ];
const NOTES: &'static [&'static str] = &[
"The first time a non-option argument is encountered, the program stops parsing arguments and assumes the rest of the arguments are paths.",
"If parallelism is set to unlimited, there can be a huge syscall overhead. It is recommended to use `-m` in large runs.",
"",
"Symlinks are ignored while collecting stat data. They will fail with message 'Unknown file type'. Symlinks are generally very small in the actual data they contain themselves, so this is *usually* unimportant.",
#[cfg(feature="inspect")] "\nThe save formats of `--save` (`-D`) and `--save-raw` (`-Dr`) are incompatible. The former is bzip2 compressed the latter is uncompressed.",
];
fn get_opt_normal() -> impl fmt::Display fn get_opt_normal() -> impl fmt::Display
{ {
#[derive(Debug)] #[derive(Debug)]

@ -3,6 +3,7 @@
#[macro_use] extern crate pin_project; #[macro_use] extern crate pin_project;
#[macro_use] extern crate lazy_static; #[macro_use] extern crate lazy_static;
#[macro_use] extern crate cfg_if;
#[cfg(feature="inspect")] use serde::{Serialize, Deserialize}; #[cfg(feature="inspect")] use serde::{Serialize, Deserialize};
@ -28,6 +29,7 @@ use color_eyre::{
#[macro_use] mod ext; #[macro_use] mod ext;
pub use ext::prelude::*; pub use ext::prelude::*;
use std::sync::Arc;
mod bytes; mod bytes;
mod data; mod data;
@ -38,39 +40,14 @@ mod work;
#[cfg(feature="inspect")] mod serial; #[cfg(feature="inspect")] mod serial;
#[cfg(feature="defer-drop")] mod defer_drop; #[cfg(feature="defer-drop")] mod defer_drop;
async fn normal(cfg: config::Config) -> eyre::Result<()>
{
let state = state::State::new(cfg
.validate()
.wrap_err(eyre!("Invalid config"))
.with_suggestion(|| "Try running `--help`")?);
let (graph, cfg) = tokio::select!{
x = work::work_on_all(state) => {x}
_ = tokio::signal::ctrl_c() => {
return Err(eyre!("Interrupt signalled, exiting"));
}
};
let cfg = cfg.make_global();
let graph = tokio::task::spawn_blocking(move || {
cfg_println!(Verbose; cfg, "Computing hierarchy...");
let mut graph = graph.into_hierarchical();
cfg_println!(Verbose; cfg, "Computing sizes...");
graph.compute_recursive_sizes();
graph
}).await.expect("Failed to compute hierarchy from graph");
#[cfg(debug_assertions)] cfg_eprintln!(Verbose; cfg, "{:?}", graph);
cfg_println!(Quiet; cfg, "Max size file: {:?}", graph.path_max_size_for(data::FsKind::File));
cfg_println!(Quiet; cfg, "Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory));
cfg_println!(Quiet; cfg, "Max size all: {:?}", graph.path_max_size());
#[cfg(feature="inspect")] #[cfg(feature="inspect")]
async fn write_graph(graph: Arc<data::HierarchicalINodeGraph>) -> eyre::Result<()>
{
let cfg = config::get_global();
match cfg.serialise_output.as_ref().map(|ser_out| { match cfg.serialise_output.as_ref().map(|ser_out| {
type BoxedWrite = Box<dyn tokio::io::AsyncWrite + Unpin>; cfg_eprintln!(Verbose; cfg, "Writing graph to stream...");
type BoxedWrite = Box<dyn tokio::io::AsyncWrite + Unpin + Send + Sync>;
use futures::FutureExt; use futures::FutureExt;
use config::OutputSerialisationMode; use config::OutputSerialisationMode;
let should_comp = ser_out.should_compress(); let should_comp = ser_out.should_compress();
@ -105,7 +82,7 @@ async fn normal(cfg: config::Config) -> eyre::Result<()>
// `.2` indicates if we should compress while in normal write mode. // `.2` indicates if we should compress while in normal write mode.
Some((stream_fut, None, compress)) => { Some((stream_fut, None, compress)) => {
let stream = stream_fut.await?; let stream = stream_fut.await?;
serial::write_async(stream, &graph, compress).await serial::write_async(stream, graph.as_ref(), compress).await
.wrap_err(eyre!("Failed to serialise graph to stream"))?; .wrap_err(eyre!("Failed to serialise graph to stream"))?;
}, },
#[cfg(feature="prealloc")] Some((_task_fut, Some(output_file), _)) => { #[cfg(feature="prealloc")] Some((_task_fut, Some(output_file), _)) => {
@ -120,7 +97,7 @@ async fn normal(cfg: config::Config) -> eyre::Result<()>
.with_section(|| format!("{:?}", output_file).header("File was"))?; .with_section(|| format!("{:?}", output_file).header("File was"))?;
let mut file = file.into_std().await; let mut file = file.into_std().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
serial::write_sync_map(&mut file, &graph) serial::write_sync_map(&mut file, graph.as_ref())
}).await.wrap_err(eyre!("Prealloc panicked while dumping")) }).await.wrap_err(eyre!("Prealloc panicked while dumping"))
.with_section(|| format!("{:?}", output_file).header("File was"))? .with_section(|| format!("{:?}", output_file).header("File was"))?
.wrap_err(eyre!("Prealloc failed to dump graph to file")) .wrap_err(eyre!("Prealloc failed to dump graph to file"))
@ -128,6 +105,60 @@ async fn normal(cfg: config::Config) -> eyre::Result<()>
}, },
_ => (), _ => (),
} }
Ok(())
}
async fn normal(cfg: config::Config) -> eyre::Result<()>
{
let state = state::State::new(cfg
.validate()
.wrap_err(eyre!("Invalid config"))
.with_suggestion(|| "Try running `--help`")?);
let (graph, cfg) = tokio::select!{
x = work::work_on_all(state) => {x}
_ = tokio::signal::ctrl_c() => {
return Err(eyre!("Interrupt signalled, exiting"));
}
};
let cfg = cfg.make_global();
let graph = tokio::task::spawn_blocking(move || {
cfg_println!(Verbose; cfg, "Computing hierarchy...");
let mut graph = graph.into_hierarchical();
cfg_println!(Verbose; cfg, "Computing sizes...");
graph.compute_recursive_sizes();
graph
}).await.expect("Failed to compute hierarchy from graph");
//#[cfg(debug_assertions)] cfg_eprintln!(Verbose; cfg, "{:?}", graph);
use futures::future::OptionFuture;
let (graph, writer) = {
cfg_if!{
if #[cfg(feature="inspect")] {
let graph = Arc::new(graph);
(Arc::clone(&graph), OptionFuture::from(Some(tokio::spawn(write_graph(graph)))))
} else {
(graph, OptionFuture::from(Option::<futures::future::BoxFuture<'static, ()>>::None))
}
}
};
cfg_println!(Quiet; cfg, "Max size file: {:?}", graph.path_max_size_for(data::FsKind::File));
cfg_println!(Quiet; cfg, "Max size dir: {:?}", graph.path_max_size_for(data::FsKind::Directory));
cfg_println!(Quiet; cfg, "Max size all: {:?}", graph.path_max_size());
#[cfg(feature="inspect")]
match writer.await {
None => (),
Some(Ok(Ok(_))) => cfg_eprintln!(Verbose; cfg, "Written successfully"),
Some(Ok(error)) => return error.wrap_err(eyre!("Failed to write graph to output stream")),
Some(Err(_)) => cfg_eprintln!(Silent; cfg, "Panic while writing graph to stream"),
}
#[cfg(not(feature="inspect"))] drop(writer);
Ok(()) Ok(())
} }

@ -52,7 +52,7 @@ impl<'a, T> MaybeCompressor<'a, T>
} }
impl<'a, T> MaybeCompressor<'a, T> impl<'a, T> MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a where T: AsyncWrite +Send + Unpin + 'a
{ {
pub fn new(raw: &'a mut T, compress: Option<CompKind>) -> Self pub fn new(raw: &'a mut T, compress: Option<CompKind>) -> Self
{ {
@ -65,7 +65,7 @@ where T: AsyncWrite + Unpin + 'a
} }
impl<'a, T> DerefMut for MaybeCompressor<'a, T> impl<'a, T> DerefMut for MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a where T: AsyncWrite + Send + Unpin + 'a
{ {
fn deref_mut(&mut self) -> &mut Self::Target { fn deref_mut(&mut self) -> &mut Self::Target {
match self { match self {
@ -76,9 +76,9 @@ where T: AsyncWrite + Unpin + 'a
} }
} }
impl<'a, T> Deref for MaybeCompressor<'a, T> impl<'a, T> Deref for MaybeCompressor<'a, T>
where T: AsyncWrite + Unpin + 'a where T: AsyncWrite + Unpin + Send + 'a
{ {
type Target = dyn AsyncWrite + Unpin+ 'a; type Target = dyn AsyncWrite + Send + Unpin+ 'a;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
match self { match self {
@ -95,7 +95,7 @@ where T: AsyncWrite + Unpin + 'a
/// This compresses the output stream. /// This compresses the output stream.
/// It cannot be used by `prealloc` read/write functions, as they do not compress. /// It cannot be used by `prealloc` read/write functions, as they do not compress.
pub async fn write_async<T: Serialize, W>(mut to: W, item: &T, compress: bool) -> eyre::Result<()> pub async fn write_async<T: Serialize, W>(mut to: W, item: &T, compress: bool) -> eyre::Result<()>
where W: AsyncWrite + Unpin where W: AsyncWrite + Unpin + Send
{ {
let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was"); let sect_type_name = || std::any::type_name::<T>().header("Type trying to serialise was");
let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was"); let sect_stream_type_name = || std::any::type_name::<W>().header("Stream type was");
@ -162,6 +162,9 @@ mod prealloc {
.with_section(sect_type_name.clone())?; .with_section(sect_type_name.clone())?;
let fd = file.as_raw_fd(); let fd = file.as_raw_fd();
cfg_eprintln!(Verbose; config::get_global(), "Writing (raw) {} bytes of type {:?} to fd {}", vec.len(), std::any::type_name::<T>(), fd);
unsafe { unsafe {
if libc::fallocate(fd, 0, 0, vec.len().try_into() if libc::fallocate(fd, 0, 0, vec.len().try_into()
.wrap_err(eyre!("Failed to cast buffer size to `off_t`")) .wrap_err(eyre!("Failed to cast buffer size to `off_t`"))

@ -1,5 +1,4 @@
use super::*; use super::*;
use std::sync::Arc;
use tokio::sync::{ use tokio::sync::{
Semaphore, Semaphore,

@ -88,7 +88,9 @@ fn walk(state: State, root: PathBuf, root_ino: INode) -> BoxFuture<'static, Hash
async move { async move {
{ {
let _guard = state.lock().enter().await; let _guard = state.lock().enter().await;
cfg_println!(state.config(), " -> {:?}", root);
//cfg_println!(state.config(), " -> {:?}", root);
match fs::read_dir(&root).await match fs::read_dir(&root).await
{ {
Ok(mut dir) => { Ok(mut dir) => {

Loading…
Cancel
Save