You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
enumerate-ordered/src/walk.rs

227 lines
6.6 KiB

//! Directory walking
use super::*;
use std::num::NonZeroUsize;
use std::{
sync::Arc,
path::{
Path, PathBuf,
},
};
use tokio::{
sync::{
RwLock,
Semaphore,
},
};
use futures::future::{
Future,
OptionFuture,
BoxFuture,
};
lazy_static! {
pub(super) static ref NUM_CPUS: usize = num_cpus::get();
}
/// Default number of max walkers allowed to work at once on the thread pool.
/// See `Config`.
#[inline]
pub fn default_max_walkers() -> Option<NonZeroUsize>
{
NonZeroUsize::new(*NUM_CPUS)
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Config
{
// None: unlimited. 0: no recursion
pub recursion_depth: Option<usize>,
// None: Unlimited tasks.
pub max_walkers: Option<NonZeroUsize>,
}
impl Default for Config
{
#[inline]
fn default() -> Self
{
Self {
recursion_depth: Some(0),
max_walkers: default_max_walkers(),
}
}
}
#[derive(Debug, Clone)]
struct UniqueState
{
// Decrease with each new depth until 0 is hit then do not proceed.
recursion_depth: Option<usize>,
output_sender: mpsc::Sender<work::FileInfo>,
}
#[derive(Debug)]
struct SharedState
{
worker_config: Arc<work::Config>,
config: Config,
new_worker_semaphore: Option<Semaphore>,
}
#[derive(Debug, Clone)]
struct State {
/// Shared state for all iterations
shared: Arc<SharedState>, //TODO: XXX: *another* Arc slowdown...
/// State for current iteration
current: UniqueState,
}
impl State
{
#[inline]
pub fn create_file_info_generator(&self) -> impl Fn(PathBuf, std::fs::Metadata) -> work::FileInfo + '_
{
work::FileInfo::factory(Arc::clone(&self.shared.worker_config)) //XXX: ANOTHER unneeded Arc clone..
}
#[inline(always)]
pub fn shared(&self) -> &SharedState
{
&self.shared
}
#[inline(always)]
pub fn unique(&self) -> &UniqueState
{
&self.current
}
#[inline(always)]
pub fn unique_mut(&mut self) -> &mut UniqueState
{
&mut self.current
}
/// Create a `State` for a recursed child walk.
#[inline]
pub fn create_child(&self) -> Option<Self>
{
let recursion_depth = match self.current.recursion_depth.map(|depth| depth.saturating_sub(1)) {
// Prevent child creation if recursion bound hits 0.
Some(0) => return None,
x => x,
};
Some(Self {
shared: Arc::clone(&self.shared),
current: UniqueState { recursion_depth, output_sender: self.current.output_sender.clone() },
})
}
}
/// Walk a single directory in the current task. Dispatch async results to `walk
async fn walk_directory<F>(state: &State, mut push_child: F, whence: impl AsRef<Path>) -> eyre::Result<usize>
where F: FnMut(State, PathBuf)
{
//use futures::future::FutureExt;
let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?;
let sender = &state.current.output_sender;
let file_info = state.create_file_info_generator();
let mut n = 0;
while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")?
{
let metadata = match fso.metadata().await {
Ok(metadata) => metadata,
Err(e) => {
error!("Failed to stat {:?}: {}", fso.file_name(), e);
continue;
}
};
if metadata.is_file() {
if let Err(_) = sender.send(file_info(fso.path(), metadata)).await {
warn!("Worker shut down, stopping iteration");
break;
} else {
n += 1;
}
} else if metadata.is_dir() {
let Some(state) = state.create_child() else { continue };
push_child(state, fso.path());
} // Ignore symlinks.
}
Ok(n)
//todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks")
}
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result<usize>
{
use futures::prelude::*;
//async move
{
let backing_res = tokio::spawn(async move {
let _permit = match &state.shared.new_worker_semaphore {
Some(sem) => Some(sem.acquire().await.expect("Failed to acquire permit")),
None => None,
};
// `JoinHandle<eyre::Result<size>>`s to recursion children
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
let mut children = Vec::new();
// Number of *files* sent to tx from this iteration.
let counted = if whence.is_dir() {
walk_directory(&state, |state, whence| {
fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result<usize>>
{
walk_inner(state, whence).boxed()
}
children.push(walk_inner2(state, whence));
}, &whence).await?
} else {
let metadata = tokio::fs::metadata(&whence).await.wrap_err("Failed to stat top-level file")?;
if let Err(_) = state.current.output_sender.send(work::FileInfo::new(Arc::clone(&state.shared.worker_config), whence, metadata)).await {
return Err(eyre!("Failed to send top-level file to backing sorter: Sorter closed."));
}
1
};
Ok((counted, children))
}).await.expect("Panic in backing walker thread");
trace!("Spawning and collecting child workers");
match backing_res {
Ok((counted, children)) => {
use futures::prelude::*;
Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().filter_map(|res| match res {
Ok(n) => Some(n),
Err(e) => {
error!("Child failed to walk: {}", e);
None
},
}).sum::<usize>())
},
Err(e) => Err(e),
}
}
}
#[inline]
pub fn start_walk(cfg: Config, worker: Arc<work::Config>, whence: impl AsRef<Path>, whereto: mpsc::Sender<work::FileInfo>) -> impl Future<Output = eyre::Result<usize>> + 'static
{
use futures::prelude::*;
walk_inner(State {
current: UniqueState {
recursion_depth: cfg.recursion_depth,
output_sender: whereto,
},
shared: Arc::new(SharedState {
new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())),
config: cfg,
worker_config: worker,
}),
}, whence.as_ref().to_owned())
//.flatten()
}