use std::{ path::{ Path, PathBuf, }, sync::Arc, num::NonZeroUsize, marker::{ Send, Sync, }, }; use tokio::{ sync::{ mpsc, Semaphore, }, io, fs, task, stream::StreamExt, }; use futures::future::{BoxFuture, join_all, join, FutureExt as _}; const MAX_PATHS: usize = 10000; //prevent OOM const MAX_WALKERS: usize = 24; #[inline] pub fn recommended_max_walkers() -> Option { unsafe { Some(NonZeroUsize::new_unchecked(MAX_WALKERS)) } } pub async fn walk+Send+Sync>(path: P, recurse: Option, max_walkers: Option) -> Result, Error> { let (tx, rx) = mpsc::channel(max_walkers.as_ref().map(|&num| usize::from(num)).unwrap_or(16)); let semaphore = max_walkers.map(|x| Arc::new(Semaphore::new(usize::from(x)))); let (out, sz) = join(rx .take(MAX_PATHS) .collect::>(), _walk(path, 1, recurse, semaphore, tx)) .await; sz?; Ok(out) } #[inline] fn __walk<'a, P: AsRef+Send+Sync+'a>(path: P, depth: usize, recurse: Option, semaphore: Option>, output: mpsc::Sender) -> BoxFuture<'a,Result> { async move {_walk(path,depth,recurse,semaphore,output).await}.boxed() } async fn _walk+Send+Sync>(path: P, depth: usize, recurse: Option, semaphore: Option>, mut output: mpsc::Sender) -> Result { let path = path.as_ref(); let can_recurse = || match &recurse { None => true, &Some(nzu) => depth < usize::from(nzu), }; if path.is_dir() { let _lock = semaphore.as_ref().map(|x| x.acquire()); let mut children = Vec::new(); let mut dir = fs::read_dir(path).await?; let mut files=0usize; while let Some(edir) = dir.next_entry().await? { let dir = edir.path(); if dir.is_file() { output.send(dir).await?; files+=1; } else if dir.is_dir() && can_recurse() { let sem = semaphore.clone(); let output = output.clone(); children.push({ let child = tokio::spawn(async move { (__walk(&dir, depth+1, recurse, sem, output).await, dir) }.boxed()); task::yield_now().await; child }); } } Ok(join_all(children).await.into_iter() .filter_map(|x| match x { Ok(v) => Some(v), Err(err)=> { eprintln!("Child panic: {}", err); None } }) .filter_map(|(x, name)| { match x { Ok(v) => Some(v), Err(e) => { eprintln!("Failed to parse path {:?}: {}", name, e); None }, } }).sum::() + files) } else if path.is_file() { output.send(path.to_owned()).await?; Ok(1) } else { Err(Error::FileNotFound(path.to_owned())) } } #[derive(Debug)] pub enum Error { IO(io::Error), Send, FileNotFound(PathBuf), } impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { Some(match &self { Self::IO(io) => io, _ => return None, }) } } impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::IO(io) => write!(f, "i/o error: {}", io), Self::Send => write!(f, "mpsc error: this usually means we tried to take too many files"), Self::FileNotFound(path) => write!(f, "path {:?} does not exist", path), } } } impl From for Error { fn from(from: io::Error) -> Self { Self::IO(from) } } impl From> for Error { #[inline] fn from(_: mpsc::error::SendError) -> Self { Self::Send } }