You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
3.6 KiB

use std::{
path::{
Path,
PathBuf,
},
sync::Arc,
num::NonZeroUsize,
marker::{
Send,
Sync,
},
};
use tokio::{
sync::{
mpsc,
Semaphore,
},
io,
fs,
task,
stream::StreamExt,
};
use futures::future::{BoxFuture, join_all, join, FutureExt as _};
const MAX_PATHS: usize = 10000; //prevent OOM
const MAX_WALKERS: usize = 24;
#[inline] pub fn recommended_max_walkers() -> Option<NonZeroUsize>
{
unsafe {
Some(NonZeroUsize::new_unchecked(MAX_WALKERS))
}
}
pub async fn walk<P: AsRef<Path>+Send+Sync>(path: P, recurse: Option<NonZeroUsize>, max_walkers: Option<NonZeroUsize>) -> Result<Vec<PathBuf>, Error>
{
let (tx, rx) = mpsc::channel(max_walkers.as_ref().map(|&num| usize::from(num)).unwrap_or(16));
let semaphore = max_walkers.map(|x| Arc::new(Semaphore::new(usize::from(x))));
let (out, sz) = join(rx
.take(MAX_PATHS)
.collect::<Vec<PathBuf>>(),
_walk(path, 1, recurse, semaphore, tx))
.await;
sz?;
Ok(out)
}
#[inline] fn __walk<'a, P: AsRef<Path>+Send+Sync+'a>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, output: mpsc::Sender<PathBuf>) -> BoxFuture<'a,Result<usize, Error>>
{
async move {_walk(path,depth,recurse,semaphore,output).await}.boxed()
}
async fn _walk<P: AsRef<Path>+Send+Sync>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, mut output: mpsc::Sender<PathBuf>) -> Result<usize, Error>
{
let path = path.as_ref();
let can_recurse = || match &recurse {
None => true,
&Some(nzu) => depth < usize::from(nzu),
};
if path.is_dir() {
let _lock = semaphore.as_ref().map(|x| x.acquire());
let mut children = Vec::new();
let mut dir = fs::read_dir(path).await?;
let mut files=0usize;
while let Some(edir) = dir.next_entry().await? {
let dir = edir.path();
if dir.is_file() {
output.send(dir).await?;
files+=1;
} else if dir.is_dir() && can_recurse() {
let sem = semaphore.clone();
let output = output.clone();
children.push({
let child = tokio::spawn(async move {
(__walk(&dir, depth+1, recurse, sem, output).await, dir)
}.boxed());
task::yield_now().await;
child
});
}
}
Ok(join_all(children).await.into_iter()
.filter_map(|x| match x {
Ok(v) => Some(v),
Err(err)=> {
eprintln!("Child panic: {}", err);
None }
})
.filter_map(|(x, name)| {
match x {
Ok(v) => Some(v),
Err(e) => {
eprintln!("Failed to parse path {:?}: {}", name, e);
None
},
}
}).sum::<usize>() + files)
} else if path.is_file() {
output.send(path.to_owned()).await?;
Ok(1)
} else {
Err(Error::FileNotFound(path.to_owned()))
}
}
#[derive(Debug)]
pub enum Error {
IO(io::Error),
Send,
FileNotFound(PathBuf),
}
impl std::error::Error for Error
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)>
{
Some(match &self {
Self::IO(io) => io,
_ => return None,
})
}
}
impl std::fmt::Display for Error
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{
match self {
Self::IO(io) => write!(f, "i/o error: {}", io),
Self::Send => write!(f, "mpsc error: this usually means we tried to take too many files"),
Self::FileNotFound(path) => write!(f, "path {:?} does not exist", path),
}
}
}
impl From<io::Error> for Error
{
fn from(from: io::Error) -> Self
{
Self::IO(from)
}
}
impl<T> From<mpsc::error::SendError<T>> for Error
{
#[inline] fn from(_: mpsc::error::SendError<T>) -> Self
{
Self::Send
}
}