From 9d044916377318dbc7de051272adb5251cc360a5 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 9 Aug 2020 01:53:45 +0100 Subject: [PATCH] works~ --- .gitignore | 1 + Cargo.toml | 2 +- src/arg.rs | 50 ++++++++++++--- src/dir.rs | 151 +++++++++++++++++++++++++++++++++++++++++++++ src/fixed_stack.rs | 74 ++++++++++++++++++++++ src/leanify.rs | 1 - src/main.rs | 8 +-- src/process.rs | 72 ++++++++++++++++++--- src/work.rs | 74 +++++++++++++++------- 9 files changed, 387 insertions(+), 46 deletions(-) create mode 100644 src/dir.rs create mode 100644 src/fixed_stack.rs diff --git a/.gitignore b/.gitignore index e2a3069..7e27f36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target *~ +/test diff --git a/Cargo.toml b/Cargo.toml index 411762f..399b05f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ splash = [] [dependencies] lazy_static = "1.4" -tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} +tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} futures = "0.3" [build-dependencies] diff --git a/src/arg.rs b/src/arg.rs index 782faa4..6f7a8f2 100644 --- a/src/arg.rs +++ b/src/arg.rs @@ -4,6 +4,10 @@ use std::{ NonZeroUsize, ParseIntError, }, + path::{ + PathBuf, + Path, + }, }; use lazy_static::lazy_static; @@ -36,6 +40,8 @@ ENVIRONMENT VARS: pub enum Error { #[cfg(nightly)] BadNumber(std::num::IntErrorKind), #[cfg(not(nightly))] BadNumber(()), + NoExist(PathBuf), + Walking(dir::Error), NoFiles, } impl std::error::Error for Error{} @@ -59,7 +65,9 @@ impl std::fmt::Display for Error }; #[cfg(not(nightly))] Ok(()) }, - Self::NoFiles => write!(f, "need at least one argument") + Self::NoExist(path) => write!(f, "path {:?} does not exist", path), + Self::NoFiles => write!(f, "need at least one argument"), + Self::Walking(dir) => write!(f, "error walking directory structure(s): {}", dir), } } } @@ -78,7 +86,8 @@ impl From for Error pub struct Config { pub max_children: Option, - pub files: Vec, + pub files: Vec, + pub recursive: Option, } impl Default for Config @@ -89,21 +98,22 @@ impl Default for Config Self { max_children: None, files: Vec::new(), + recursive: Some(unsafe{NonZeroUsize::new_unchecked(1)}), } } } /// Parse the `env::args()` -#[inline] pub fn parse_args() -> Result +#[inline] pub async fn parse_args() -> Result { let args = std::env::args(); if args.len() <= 1 { println!("Warning: No arguments specified, try passing `--help`."); } - parse(args.skip(1)) + parse(args.skip(1)).await } -fn parse(args: I) -> Result +async fn parse(args: I) -> Result where I: IntoIterator, T: Into { @@ -111,6 +121,7 @@ where I: IntoIterator, let mut cfg = Config::default(); let mut reading=true; let mut first=true; + while let Some(arg) = args.next() { if reading { let lw_arg = arg.trim().to_lowercase(); @@ -130,7 +141,16 @@ where I: IntoIterator, cfg.max_children = Some(nzi.parse()?); continue; } - } + }, + "--recursive" => { + if let Some(nzi) = args.next() { + cfg.recursive = Some(nzi.parse()?); + continue; + } + }, + "-r" => { + cfg.recursive = None; + }, "-" => { reading= false; continue; @@ -139,10 +159,26 @@ where I: IntoIterator, } } reading = false; - cfg.files.push(arg); + + let path = Path::new(&arg); + if path.is_dir() { + cfg.files.extend(dir::walk(path, cfg.recursive, dir::recommended_max_walkers()).await?); + } else if path.is_file() { + cfg.files.push(path.to_owned()); + } else { + return Err(Error::NoExist(path.to_owned())); + } } if cfg.files.len() == 0 { return Err(Error::NoFiles); } Ok(cfg) } + +impl From for Error +{ + fn from(from: dir::Error) -> Self + { + Self::Walking(from) + } +} diff --git a/src/dir.rs b/src/dir.rs new file mode 100644 index 0000000..0ed309a --- /dev/null +++ b/src/dir.rs @@ -0,0 +1,151 @@ +use std::{ + path::{ + Path, + PathBuf, + }, + sync::Arc, + num::NonZeroUsize, + marker::{ + Send, + Sync, + }, +}; +use tokio::{ + sync::{ + mpsc, + Semaphore, + }, + io, + fs, + task, + stream::StreamExt, +}; +use futures::future::{BoxFuture, join_all, join, FutureExt as _}; + +const MAX_PATHS: usize = 10000; //prevent OOM +const MAX_WALKERS: usize = 24; + +#[inline] pub fn recommended_max_walkers() -> Option +{ + unsafe { + Some(NonZeroUsize::new_unchecked(MAX_WALKERS)) + } +} + +pub async fn walk+Send+Sync>(path: P, recurse: Option, max_walkers: Option) -> Result, Error> +{ + let (tx, rx) = mpsc::channel(max_walkers.as_ref().map(|&num| usize::from(num)).unwrap_or(16)); + let semaphore = max_walkers.map(|x| Arc::new(Semaphore::new(usize::from(x)))); + + let (out, sz) = join(rx + .take(MAX_PATHS) + .collect::>(), + _walk(path, 1, recurse, semaphore, tx)) + .await; + sz?; + Ok(out) +} + +#[inline] fn __walk<'a, P: AsRef+Send+Sync+'a>(path: P, depth: usize, recurse: Option, semaphore: Option>, output: mpsc::Sender) -> BoxFuture<'a,Result> +{ + async move {_walk(path,depth,recurse,semaphore,output).await}.boxed() +} + +async fn _walk+Send+Sync>(path: P, depth: usize, recurse: Option, semaphore: Option>, mut output: mpsc::Sender) -> Result +{ + let path = path.as_ref(); + let can_recurse = || match &recurse { + None => true, + &Some(nzu) => depth < usize::from(nzu), + }; + + if path.is_dir() { + let _lock = semaphore.as_ref().map(|x| x.acquire()); + let mut children = Vec::new(); + let mut dir = fs::read_dir(path).await?; + let mut files=0usize; + while let Some(edir) = dir.next_entry().await? { + let dir = edir.path(); + if dir.is_file() { + output.send(dir).await?; + files+=1; + } else if dir.is_dir() && can_recurse() { + let sem = semaphore.clone(); + let output = output.clone(); + children.push({ + let child = tokio::spawn(async move { + (__walk(&dir, depth+1, recurse, sem, output).await, dir) + }.boxed()); + task::yield_now().await; + child + }); + } + } + Ok(join_all(children).await.into_iter() + .filter_map(|x| match x { + Ok(v) => Some(v), + Err(err)=> { + eprintln!("Child panic: {}", err); + None } + }) + .filter_map(|(x, name)| { + match x { + Ok(v) => Some(v), + Err(e) => { + eprintln!("Failed to parse path {:?}: {}", name, e); + None + }, + } + }).sum::() + files) + } else if path.is_file() { + output.send(path.to_owned()).await?; + Ok(1) + } else { + Err(Error::FileNotFound(path.to_owned())) + } + +} + +#[derive(Debug)] +pub enum Error { + IO(io::Error), + Send, + FileNotFound(PathBuf), +} +impl std::error::Error for Error +{ + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> + { + Some(match &self { + Self::IO(io) => io, + _ => return None, + }) + } +} +impl std::fmt::Display for Error +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result + { + match self { + Self::IO(io) => write!(f, "i/o error: {}", io), + Self::Send => write!(f, "mpsc error: this usually means we tried to take too many files"), + Self::FileNotFound(path) => write!(f, "path {:?} does not exist", path), + } + } +} + +impl From for Error +{ + fn from(from: io::Error) -> Self + { + Self::IO(from) + } +} + +impl From> for Error +{ + #[inline] fn from(_: mpsc::error::SendError) -> Self + { + Self::Send + } +} diff --git a/src/fixed_stack.rs b/src/fixed_stack.rs new file mode 100644 index 0000000..512c710 --- /dev/null +++ b/src/fixed_stack.rs @@ -0,0 +1,74 @@ +use std::{ + collections::{ + LinkedList, + linked_list::{ + IntoIter, + Iter, + IterMut, + }, + }, + iter::Rev, +}; + +pub struct FixedStack(LinkedList, usize); + +impl FixedStack +{ + #[inline] pub fn new(sz: usize) -> Self + { + Self ( + LinkedList::new(), + sz + ) + } + + /// Try to push an item, returns Ok(()) if successful + pub fn push(&mut self, item: T) + { + if self.0.len() >= self.1 { + self.0.pop_back(); + } + self.0.push_front(item); + } + + pub fn clear(&mut self) + { + self.0.clear() + } + + pub fn pop(&mut self) -> Option + { + self.0.pop_back() + } + + pub fn len(&self) -> usize + { + self.0.len() + } + + pub fn cap(&self) -> usize + { + self.1 + } + + pub fn iter(&self) -> Rev> + { + self.0.iter().rev() + } + + pub fn iter_mut(&mut self)-> Rev> + { + self.0.iter_mut().rev() + } +} + +impl IntoIterator for FixedStack +{ + type Item=T; + type IntoIter = Rev>; + + fn into_iter(self)-> Self::IntoIter + { + self.0.into_iter().rev() + } +} diff --git a/src/leanify.rs b/src/leanify.rs index 8c75763..5e50953 100644 --- a/src/leanify.rs +++ b/src/leanify.rs @@ -1,4 +1,3 @@ -use super::*; use std::{ fmt, borrow::Cow, diff --git a/src/main.rs b/src/main.rs index 394c115..291e92f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,17 +12,17 @@ pub use error::{ErrorExt as _, ResultExt as _}; mod stage; mod leanify; +mod dir; +mod fixed_stack; mod process; mod work; async fn work() -> Result<(), Box> { - let args = arg::parse_args().with_prefix("failed to parse args")?; + let args = arg::parse_args().await.with_prefix("failed to parse args")?; let leanify = leanify::find_binary().with_prefix("Couldn't find leanify binary")?; - - - Ok(()) + work::work(leanify, args.files, args.max_children).await } diff --git a/src/process.rs b/src/process.rs index 59396ad..498121c 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,23 +1,75 @@ //! Handles spawning the process +use tokio::{ + process::{ + Command, + }, + sync::{ + mpsc, + }, + io::{ + BufReader, + }, + stream::StreamExt, + prelude::*, + task, +}; +use std::{ + io, + path::Path, +}; /// Spawn the process, and contain its standard output. /// /// # Notes /// Standard error is printed immediately instead. -pub async fn contained_spawn(process: T, args: U) -> Result -where T: AsRef, +pub async fn contained_spawn(process: T, args: U, mut output_to: mpsc::Sender) -> Result<(), Error> +where T: AsRef, U: IntoIterator, - T: AsRef + V: AsRef { - + let mut child = match Command::new(process.as_ref()) + .args(args) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::inherit()) + .stdin(std::process::Stdio::null()) + .spawn() { + Ok(chi) => chi, + Err(sp) => { + return Err(Error::Spawning(sp)); + } + }; + let stdout = child.stdout.take().unwrap(); + let sender = tokio::spawn(async move { + let mut lines = BufReader::new(stdout).lines(); + + while let Some(Ok(line)) = lines.next().await { + if let Err(_) = output_to.send(line).await { + break; + } + } + }); + task::yield_now().await; + match child.await { + Ok(exit) => { + if exit.success() { + sender.await.expect("Child panic"); + Ok(()) + } else { + Err(Error::Process(exit.code())) + } + }, + Err(err) => Err(Error::Transmission(err)), + } } #[derive(Debug)] pub enum Error { /// There was an error spawning the process - Spawning, + Spawning(io::Error), /// Process exited with non-zero error code. - Process, + Process(Option), + /// Error communicating with process. + Transmission(io::Error), } impl std::error::Error for Error{} impl std::fmt::Display for Error @@ -25,10 +77,10 @@ impl std::fmt::Display for Error fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Spawning => write!(f, "there was an error spawning the process"), - Self::Process => write!(f, "process exited with non-zero code"), + Self::Spawning(io) => write!(f, "there was an error spawning the process: {}", io), + Self::Transmission(io) => write!(f, "there was an error waiting on child process: {}", io), + Self::Process(Some(sig)) => write!(f, "process exited with non-zero code: {}", sig), + Self::Process(None) => write!(f, "process teminated by signal"), } } } - - diff --git a/src/work.rs b/src/work.rs index 0a38253..470fd84 100644 --- a/src/work.rs +++ b/src/work.rs @@ -2,24 +2,27 @@ use super::*; use std::{ num::NonZeroUsize, sync::Arc, - pin::Pin, marker::{ Send, + Sync, }, + path::{Path,PathBuf,}, + ffi::OsStr, }; use tokio::{ prelude::*, - stream::StreamExt, sync::{ Semaphore, + mpsc, }, + io, }; use futures::future::{ join_all, Future, }; -pub async fn maybe_await(from: Option) -> Option<::Output> +async fn maybe_await(from: Option) -> Option<::Output> where T: Future { if let Some(v) = from { @@ -29,42 +32,67 @@ where T: Future } } -pub async fn do_work(process: impl AsRef, file: impl AsRef) +async fn do_work(process: impl AsRef, file: impl AsRef) -> Result, process::Error> { let process = process.as_ref(); let file = file.as_ref(); - match process::contained_spawn(process, std::iter::once(file)).await { - Ok(output) => { - - }, - Err(process::Error::Spawning) => { - - }, - Err(process::Error::Process) => { - - }, + let (tx, mut rx) = mpsc::channel(16); + println!("[p]: Processing {:?}", file); + let collector = tokio::spawn(async move { + let mut stack = fixed_stack::FixedStack::new(100); + while let Some(value) = rx.recv().await { + stack.push(value); + } + stack + }); + tokio::task::yield_now().await; + match process::contained_spawn(process, std::iter::once(file), tx).await { + Ok(_) => Ok(collector.await.expect("Child panic").into_iter().collect()), + Err(error) => Err(error), } - } -pub async fn work(process: String, files: I, children: Option) -> Result<(), Box> +pub async fn work(process: U, files: I, children: Option) -> Result<(), Box> where I: IntoIterator, - T: AsRef + Send + 'static + T: AsRef + Send + Sync + 'static, + U: Into { - //let mut stage: stage::Stage = files.into_iter().map(|x| x.into()).collect(); + let (tx,mut rx) = mpsc::channel::<(T, Vec)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); - let process = Arc::new(process); + let process = Arc::new(process.into()); + + let display = tokio::spawn(async move { + let mut stdout = io::stdout(); + while let Some((file, values)) = rx.recv().await { + for line in values.into_iter() + { + let line = format!("[i] {:?}: {}\n", file.as_ref(), line); + let _ = stdout.write_all(line.as_bytes()).await; + + } + } + }); - join_all(files.into_iter() + for failed in join_all(files.into_iter() .map(|filename| { let semaphore = semaphore.clone(); let process = Arc::clone(&process); + let mut tx = tx.clone(); tokio::spawn(async move { let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; - do_work(&process[..], filename).await; + match do_work(process.as_ref(), &filename).await { + Ok(strings) => tx.send((filename, strings)).await.map_err(|_| "Child panic").unwrap(), + Err(error) => eprintln!("[!] {:?}: {}", filename.as_ref(), error), + } }) - })).await; - + })).await + .into_iter() + .filter_map(|x| x.err()) + { + eprintln!("Child panic: {}", failed); + } + drop(tx); + display.await?; Ok(()) }