You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
152 lines
3.6 KiB
152 lines
3.6 KiB
4 years ago
|
use std::{
|
||
|
path::{
|
||
|
Path,
|
||
|
PathBuf,
|
||
|
},
|
||
|
sync::Arc,
|
||
|
num::NonZeroUsize,
|
||
|
marker::{
|
||
|
Send,
|
||
|
Sync,
|
||
|
},
|
||
|
};
|
||
|
use tokio::{
|
||
|
sync::{
|
||
|
mpsc,
|
||
|
Semaphore,
|
||
|
},
|
||
|
io,
|
||
|
fs,
|
||
|
task,
|
||
|
stream::StreamExt,
|
||
|
};
|
||
|
use futures::future::{BoxFuture, join_all, join, FutureExt as _};
|
||
|
|
||
|
const MAX_PATHS: usize = 10000; //prevent OOM
|
||
|
const MAX_WALKERS: usize = 24;
|
||
|
|
||
|
#[inline] pub fn recommended_max_walkers() -> Option<NonZeroUsize>
|
||
|
{
|
||
|
unsafe {
|
||
|
Some(NonZeroUsize::new_unchecked(MAX_WALKERS))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub async fn walk<P: AsRef<Path>+Send+Sync>(path: P, recurse: Option<NonZeroUsize>, max_walkers: Option<NonZeroUsize>) -> Result<Vec<PathBuf>, Error>
|
||
|
{
|
||
|
let (tx, rx) = mpsc::channel(max_walkers.as_ref().map(|&num| usize::from(num)).unwrap_or(16));
|
||
|
let semaphore = max_walkers.map(|x| Arc::new(Semaphore::new(usize::from(x))));
|
||
|
|
||
|
let (out, sz) = join(rx
|
||
|
.take(MAX_PATHS)
|
||
|
.collect::<Vec<PathBuf>>(),
|
||
|
_walk(path, 1, recurse, semaphore, tx))
|
||
|
.await;
|
||
|
sz?;
|
||
|
Ok(out)
|
||
|
}
|
||
|
|
||
|
#[inline] fn __walk<'a, P: AsRef<Path>+Send+Sync+'a>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, output: mpsc::Sender<PathBuf>) -> BoxFuture<'a,Result<usize, Error>>
|
||
|
{
|
||
|
async move {_walk(path,depth,recurse,semaphore,output).await}.boxed()
|
||
|
}
|
||
|
|
||
|
async fn _walk<P: AsRef<Path>+Send+Sync>(path: P, depth: usize, recurse: Option<NonZeroUsize>, semaphore: Option<Arc<Semaphore>>, mut output: mpsc::Sender<PathBuf>) -> Result<usize, Error>
|
||
|
{
|
||
|
let path = path.as_ref();
|
||
|
let can_recurse = || match &recurse {
|
||
|
None => true,
|
||
|
&Some(nzu) => depth < usize::from(nzu),
|
||
|
};
|
||
|
|
||
|
if path.is_dir() {
|
||
|
let _lock = semaphore.as_ref().map(|x| x.acquire());
|
||
|
let mut children = Vec::new();
|
||
|
let mut dir = fs::read_dir(path).await?;
|
||
|
let mut files=0usize;
|
||
|
while let Some(edir) = dir.next_entry().await? {
|
||
|
let dir = edir.path();
|
||
|
if dir.is_file() {
|
||
|
output.send(dir).await?;
|
||
|
files+=1;
|
||
|
} else if dir.is_dir() && can_recurse() {
|
||
|
let sem = semaphore.clone();
|
||
|
let output = output.clone();
|
||
|
children.push({
|
||
|
let child = tokio::spawn(async move {
|
||
|
(__walk(&dir, depth+1, recurse, sem, output).await, dir)
|
||
|
}.boxed());
|
||
|
task::yield_now().await;
|
||
|
child
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
Ok(join_all(children).await.into_iter()
|
||
|
.filter_map(|x| match x {
|
||
|
Ok(v) => Some(v),
|
||
|
Err(err)=> {
|
||
|
eprintln!("Child panic: {}", err);
|
||
|
None }
|
||
|
})
|
||
|
.filter_map(|(x, name)| {
|
||
|
match x {
|
||
|
Ok(v) => Some(v),
|
||
|
Err(e) => {
|
||
|
eprintln!("Failed to parse path {:?}: {}", name, e);
|
||
|
None
|
||
|
},
|
||
|
}
|
||
|
}).sum::<usize>() + files)
|
||
|
} else if path.is_file() {
|
||
|
output.send(path.to_owned()).await?;
|
||
|
Ok(1)
|
||
|
} else {
|
||
|
Err(Error::FileNotFound(path.to_owned()))
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
#[derive(Debug)]
|
||
|
pub enum Error {
|
||
|
IO(io::Error),
|
||
|
Send,
|
||
|
FileNotFound(PathBuf),
|
||
|
}
|
||
|
impl std::error::Error for Error
|
||
|
{
|
||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)>
|
||
|
{
|
||
|
Some(match &self {
|
||
|
Self::IO(io) => io,
|
||
|
_ => return None,
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
impl std::fmt::Display for Error
|
||
|
{
|
||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
|
||
|
{
|
||
|
match self {
|
||
|
Self::IO(io) => write!(f, "i/o error: {}", io),
|
||
|
Self::Send => write!(f, "mpsc error: this usually means we tried to take too many files"),
|
||
|
Self::FileNotFound(path) => write!(f, "path {:?} does not exist", path),
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl From<io::Error> for Error
|
||
|
{
|
||
|
fn from(from: io::Error) -> Self
|
||
|
{
|
||
|
Self::IO(from)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
impl<T> From<mpsc::error::SendError<T>> for Error
|
||
|
{
|
||
|
#[inline] fn from(_: mpsc::error::SendError<T>) -> Self
|
||
|
{
|
||
|
Self::Send
|
||
|
}
|
||
|
}
|