master
Avril 4 years ago
parent 2becd03c6d
commit 9d04491637
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
.gitignore vendored

@ -1,2 +1,3 @@
/target /target
*~ *~
/test

@ -15,7 +15,7 @@ splash = []
[dependencies] [dependencies]
lazy_static = "1.4" lazy_static = "1.4"
tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]}
futures = "0.3" futures = "0.3"
[build-dependencies] [build-dependencies]

@ -4,6 +4,10 @@ use std::{
NonZeroUsize, NonZeroUsize,
ParseIntError, ParseIntError,
}, },
path::{
PathBuf,
Path,
},
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
@ -36,6 +40,8 @@ ENVIRONMENT VARS:
pub enum Error { pub enum Error {
#[cfg(nightly)] BadNumber(std::num::IntErrorKind), #[cfg(nightly)] BadNumber(std::num::IntErrorKind),
#[cfg(not(nightly))] BadNumber(()), #[cfg(not(nightly))] BadNumber(()),
NoExist(PathBuf),
Walking(dir::Error),
NoFiles, NoFiles,
} }
impl std::error::Error for Error{} impl std::error::Error for Error{}
@ -59,7 +65,9 @@ impl std::fmt::Display for Error
}; };
#[cfg(not(nightly))] Ok(()) #[cfg(not(nightly))] Ok(())
}, },
Self::NoFiles => write!(f, "need at least one argument") Self::NoExist(path) => write!(f, "path {:?} does not exist", path),
Self::NoFiles => write!(f, "need at least one argument"),
Self::Walking(dir) => write!(f, "error walking directory structure(s): {}", dir),
} }
} }
} }
@ -78,7 +86,8 @@ impl From<ParseIntError> for Error
pub struct Config pub struct Config
{ {
pub max_children: Option<NonZeroUsize>, pub max_children: Option<NonZeroUsize>,
pub files: Vec<String>, pub files: Vec<PathBuf>,
pub recursive: Option<NonZeroUsize>,
} }
impl Default for Config impl Default for Config
@ -89,21 +98,22 @@ impl Default for Config
Self { Self {
max_children: None, max_children: None,
files: Vec::new(), files: Vec::new(),
recursive: Some(unsafe{NonZeroUsize::new_unchecked(1)}),
} }
} }
} }
/// Parse the `env::args()` /// Parse the `env::args()`
#[inline] pub fn parse_args() -> Result<Config, Error> #[inline] pub async fn parse_args() -> Result<Config, Error>
{ {
let args = std::env::args(); let args = std::env::args();
if args.len() <= 1 { if args.len() <= 1 {
println!("Warning: No arguments specified, try passing `--help`."); println!("Warning: No arguments specified, try passing `--help`.");
} }
parse(args.skip(1)) parse(args.skip(1)).await
} }
fn parse<I,T>(args: I) -> Result<Config, Error> async fn parse<I,T>(args: I) -> Result<Config, Error>
where I: IntoIterator<Item=T>, where I: IntoIterator<Item=T>,
T: Into<String> T: Into<String>
{ {
@ -111,6 +121,7 @@ where I: IntoIterator<Item=T>,
let mut cfg = Config::default(); let mut cfg = Config::default();
let mut reading=true; let mut reading=true;
let mut first=true; let mut first=true;
while let Some(arg) = args.next() { while let Some(arg) = args.next() {
if reading { if reading {
let lw_arg = arg.trim().to_lowercase(); let lw_arg = arg.trim().to_lowercase();
@ -130,7 +141,16 @@ where I: IntoIterator<Item=T>,
cfg.max_children = Some(nzi.parse()?); cfg.max_children = Some(nzi.parse()?);
continue; continue;
} }
} },
"--recursive" => {
if let Some(nzi) = args.next() {
cfg.recursive = Some(nzi.parse()?);
continue;
}
},
"-r" => {
cfg.recursive = None;
},
"-" => { "-" => {
reading= false; reading= false;
continue; continue;
@ -139,10 +159,26 @@ where I: IntoIterator<Item=T>,
} }
} }
reading = false; reading = false;
cfg.files.push(arg);
let path = Path::new(&arg);
if path.is_dir() {
cfg.files.extend(dir::walk(path, cfg.recursive, dir::recommended_max_walkers()).await?);
} else if path.is_file() {
cfg.files.push(path.to_owned());
} else {
return Err(Error::NoExist(path.to_owned()));
}
} }
if cfg.files.len() == 0 { if cfg.files.len() == 0 {
return Err(Error::NoFiles); return Err(Error::NoFiles);
} }
Ok(cfg) Ok(cfg)
} }
impl From<dir::Error> for Error
{
fn from(from: dir::Error) -> Self
{
Self::Walking(from)
}
}

@ -0,0 +1,151 @@
use std::{
path::{
Path,
PathBuf,
},
sync::Arc,
num::NonZeroUsize,
marker::{
Send,
Sync,
},
};
use tokio::{
sync::{
mpsc,
Semaphore,
},
io,
fs,
task,
stream::StreamExt,
};
use futures::future::{BoxFuture, join_all, join, FutureExt as _};
const MAX_PATHS: usize = 10000; //prevent OOM
const MAX_WALKERS: usize = 24;
#[inline] pub fn recommended_max_walkers() -> Option<NonZeroUsize>
{
unsafe {
Some(NonZeroUsize::new_unchecked(MAX_WALKERS))
}
}
pub async fn walk<P: AsRef<Path>+Send+Sync>(path: P, recurse: Option<NonZeroUsize>, max_walkers: Option<NonZeroUsize>) -> Result<Vec<PathBuf>, Error>
{
let (tx, rx) = mpsc::channel(max_walkers.as_ref().map(|&num| usize::from(num)).unwrap_or(16));
let semaphore = max_walkers.map(|x| Arc::new(Semaphore::new(usize::from(x))));
let (out, sz) = join(rx
.take(MAX_PATHS)
.collect::<Vec<PathBuf>>(),
_walk(path, 1, recurse, semaphore, tx))
.await;
sz?;
Ok(out)
}
#[inline] fn __walk<'a, P: AsRef<Path>+Send+Sync+'a>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, output: mpsc::Sender<PathBuf>) -> BoxFuture<'a,Result<usize, Error>>
{
async move {_walk(path,depth,recurse,semaphore,output).await}.boxed()
}
async fn _walk<P: AsRef<Path>+Send+Sync>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, mut output: mpsc::Sender<PathBuf>) -> Result<usize, Error>
{
let path = path.as_ref();
let can_recurse = || match &recurse {
None => true,
&Some(nzu) => depth < usize::from(nzu),
};
if path.is_dir() {
let _lock = semaphore.as_ref().map(|x| x.acquire());
let mut children = Vec::new();
let mut dir = fs::read_dir(path).await?;
let mut files=0usize;
while let Some(edir) = dir.next_entry().await? {
let dir = edir.path();
if dir.is_file() {
output.send(dir).await?;
files+=1;
} else if dir.is_dir() && can_recurse() {
let sem = semaphore.clone();
let output = output.clone();
children.push({
let child = tokio::spawn(async move {
(__walk(&dir, depth+1, recurse, sem, output).await, dir)
}.boxed());
task::yield_now().await;
child
});
}
}
Ok(join_all(children).await.into_iter()
.filter_map(|x| match x {
Ok(v) => Some(v),
Err(err)=> {
eprintln!("Child panic: {}", err);
None }
})
.filter_map(|(x, name)| {
match x {
Ok(v) => Some(v),
Err(e) => {
eprintln!("Failed to parse path {:?}: {}", name, e);
None
},
}
}).sum::<usize>() + files)
} else if path.is_file() {
output.send(path.to_owned()).await?;
Ok(1)
} else {
Err(Error::FileNotFound(path.to_owned()))
}
}
#[derive(Debug)]
pub enum Error {
IO(io::Error),
Send,
FileNotFound(PathBuf),
}
impl std::error::Error for Error
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)>
{
Some(match &self {
Self::IO(io) => io,
_ => return None,
})
}
}
impl std::fmt::Display for Error
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{
match self {
Self::IO(io) => write!(f, "i/o error: {}", io),
Self::Send => write!(f, "mpsc error: this usually means we tried to take too many files"),
Self::FileNotFound(path) => write!(f, "path {:?} does not exist", path),
}
}
}
impl From<io::Error> for Error
{
fn from(from: io::Error) -> Self
{
Self::IO(from)
}
}
impl<T> From<mpsc::error::SendError<T>> for Error
{
#[inline] fn from(_: mpsc::error::SendError<T>) -> Self
{
Self::Send
}
}

@ -0,0 +1,74 @@
use std::{
collections::{
LinkedList,
linked_list::{
IntoIter,
Iter,
IterMut,
},
},
iter::Rev,
};
pub struct FixedStack<T>(LinkedList<T>, usize);
impl<T> FixedStack<T>
{
#[inline] pub fn new(sz: usize) -> Self
{
Self (
LinkedList::new(),
sz
)
}
/// Try to push an item, returns Ok(()) if successful
pub fn push(&mut self, item: T)
{
if self.0.len() >= self.1 {
self.0.pop_back();
}
self.0.push_front(item);
}
pub fn clear(&mut self)
{
self.0.clear()
}
pub fn pop(&mut self) -> Option<T>
{
self.0.pop_back()
}
pub fn len(&self) -> usize
{
self.0.len()
}
pub fn cap(&self) -> usize
{
self.1
}
pub fn iter(&self) -> Rev<Iter<T>>
{
self.0.iter().rev()
}
pub fn iter_mut(&mut self)-> Rev<IterMut<T>>
{
self.0.iter_mut().rev()
}
}
impl<T> IntoIterator for FixedStack<T>
{
type Item=T;
type IntoIter = Rev<IntoIter<T>>;
fn into_iter(self)-> Self::IntoIter
{
self.0.into_iter().rev()
}
}

@ -1,4 +1,3 @@
use super::*;
use std::{ use std::{
fmt, fmt,
borrow::Cow, borrow::Cow,

@ -12,17 +12,17 @@ pub use error::{ErrorExt as _, ResultExt as _};
mod stage; mod stage;
mod leanify; mod leanify;
mod dir;
mod fixed_stack;
mod process; mod process;
mod work; mod work;
async fn work() -> Result<(), Box<dyn std::error::Error>> async fn work() -> Result<(), Box<dyn std::error::Error>>
{ {
let args = arg::parse_args().with_prefix("failed to parse args")?; let args = arg::parse_args().await.with_prefix("failed to parse args")?;
let leanify = leanify::find_binary().with_prefix("Couldn't find leanify binary")?; let leanify = leanify::find_binary().with_prefix("Couldn't find leanify binary")?;
work::work(leanify, args.files, args.max_children).await
Ok(())
} }

@ -1,23 +1,75 @@
//! Handles spawning the process //! Handles spawning the process
use tokio::{
process::{
Command,
},
sync::{
mpsc,
},
io::{
BufReader,
},
stream::StreamExt,
prelude::*,
task,
};
use std::{
io,
path::Path,
};
/// Spawn the process, and contain its standard output. /// Spawn the process, and contain its standard output.
/// ///
/// # Notes /// # Notes
/// Standard error is printed immediately instead. /// Standard error is printed immediately instead.
pub async fn contained_spawn<T,U,V>(process: T, args: U) -> Result<StdoutCache, Error> pub async fn contained_spawn<T,U,V>(process: T, args: U, mut output_to: mpsc::Sender<String>) -> Result<(), Error>
where T: AsRef<str>, where T: AsRef<Path>,
U: IntoIterator<Item=V>, U: IntoIterator<Item=V>,
T: AsRef<str> V: AsRef<std::ffi::OsStr>
{ {
let mut child = match Command::new(process.as_ref())
.args(args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.stdin(std::process::Stdio::null())
.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap();
let sender = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Some(Ok(line)) = lines.next().await {
if let Err(_) = output_to.send(line).await {
break;
}
}
});
task::yield_now().await;
match child.await {
Ok(exit) => {
if exit.success() {
sender.await.expect("Child panic");
Ok(())
} else {
Err(Error::Process(exit.code()))
}
},
Err(err) => Err(Error::Transmission(err)),
}
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// There was an error spawning the process /// There was an error spawning the process
Spawning, Spawning(io::Error),
/// Process exited with non-zero error code. /// Process exited with non-zero error code.
Process, Process(Option<i32>),
/// Error communicating with process.
Transmission(io::Error),
} }
impl std::error::Error for Error{} impl std::error::Error for Error{}
impl std::fmt::Display for Error impl std::fmt::Display for Error
@ -25,10 +77,10 @@ impl std::fmt::Display for Error
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{ {
match self { match self {
Self::Spawning => write!(f, "there was an error spawning the process"), Self::Spawning(io) => write!(f, "there was an error spawning the process: {}", io),
Self::Process => write!(f, "process exited with non-zero code"), Self::Transmission(io) => write!(f, "there was an error waiting on child process: {}", io),
Self::Process(Some(sig)) => write!(f, "process exited with non-zero code: {}", sig),
Self::Process(None) => write!(f, "process teminated by signal"),
} }
} }
} }

@ -2,24 +2,27 @@ use super::*;
use std::{ use std::{
num::NonZeroUsize, num::NonZeroUsize,
sync::Arc, sync::Arc,
pin::Pin,
marker::{ marker::{
Send, Send,
Sync,
}, },
path::{Path,PathBuf,},
ffi::OsStr,
}; };
use tokio::{ use tokio::{
prelude::*, prelude::*,
stream::StreamExt,
sync::{ sync::{
Semaphore, Semaphore,
mpsc,
}, },
io,
}; };
use futures::future::{ use futures::future::{
join_all, join_all,
Future, Future,
}; };
pub async fn maybe_await<T>(from: Option<T>) -> Option<<T as Future>::Output> async fn maybe_await<T>(from: Option<T>) -> Option<<T as Future>::Output>
where T: Future where T: Future
{ {
if let Some(v) = from { if let Some(v) = from {
@ -29,42 +32,67 @@ where T: Future
} }
} }
pub async fn do_work(process: impl AsRef<str>, file: impl AsRef<str>) async fn do_work(process: impl AsRef<Path>, file: impl AsRef<OsStr>) -> Result<Vec<String>, process::Error>
{ {
let process = process.as_ref(); let process = process.as_ref();
let file = file.as_ref(); let file = file.as_ref();
match process::contained_spawn(process, std::iter::once(file)).await { let (tx, mut rx) = mpsc::channel(16);
Ok(output) => { println!("[p]: Processing {:?}", file);
let collector = tokio::spawn(async move {
}, let mut stack = fixed_stack::FixedStack::new(100);
Err(process::Error::Spawning) => { while let Some(value) = rx.recv().await {
stack.push(value);
}, }
Err(process::Error::Process) => { stack
});
}, tokio::task::yield_now().await;
match process::contained_spawn(process, std::iter::once(file), tx).await {
Ok(_) => Ok(collector.await.expect("Child panic").into_iter().collect()),
Err(error) => Err(error),
} }
} }
pub async fn work<I,T>(process: String, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>> pub async fn work<I,T,U>(process: U, files: I, children: Option<NonZeroUsize>) -> Result<(), Box<dyn std::error::Error>>
where I: IntoIterator<Item=T>, where I: IntoIterator<Item=T>,
T: AsRef<str> + Send + 'static T: AsRef<OsStr> + Send + Sync + 'static,
U: Into<PathBuf>
{ {
//let mut stage: stage::Stage<String> = files.into_iter().map(|x| x.into()).collect(); let (tx,mut rx) = mpsc::channel::<(T, Vec<String>)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16));
let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into())));
let process = Arc::new(process); let process = Arc::new(process.into());
let display = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some((file, values)) = rx.recv().await {
for line in values.into_iter()
{
let line = format!("[i] {:?}: {}\n", file.as_ref(), line);
let _ = stdout.write_all(line.as_bytes()).await;
}
}
});
join_all(files.into_iter() for failed in join_all(files.into_iter()
.map(|filename| { .map(|filename| {
let semaphore = semaphore.clone(); let semaphore = semaphore.clone();
let process = Arc::clone(&process); let process = Arc::clone(&process);
let mut tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await;
do_work(&process[..], filename).await; match do_work(process.as_ref(), &filename).await {
Ok(strings) => tx.send((filename, strings)).await.map_err(|_| "Child panic").unwrap(),
Err(error) => eprintln!("[!] {:?}: {}", filename.as_ref(), error),
}
}) })
})).await; })).await
.into_iter()
.filter_map(|x| x.err())
{
eprintln!("Child panic: {}", failed);
}
drop(tx);
display.await?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save