You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
enumerate-ordered/src/walk.rs

118 lines
3.8 KiB

//! Directory walking
use super::*;
use std::num::NonZeroUsize;
use std::{
sync::Arc,
path::{
Path, PathBuf,
},
};
use tokio::{
sync::{
RwLock,
Semaphore,
},
};
use futures::future::{
Future,
OptionFuture,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
pub struct Config
{
// None: unlimited. 0: no recursion
pub recursion_depth: Option<usize>,
// None: Unlimited tasks.
pub max_walkers: Option<NonZeroUsize>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
struct UniqueState
{
// Decrease with each new depth until 0 is hit then do not proceed.
recursion_depth: Option<usize>,
}
#[derive(Debug)]
struct SharedState
{
config: Config,
output_sender: mpsc::Sender<work::FileInfo>,
new_worker_semaphore: Option<Semaphore>,
}
#[derive(Debug, Clone)]
struct State {
/// Shared state for all iterations
shared: Arc<SharedState>, //TODO: XXX: *another* Arc slowdown...
/// State for current iteration
current: UniqueState,
}
/// Walk a single directory in the current task. Dispatch async results to `walk
async fn walk_directory(state: &State, whence: impl AsRef<Path>) -> eyre::Result<usize>
{
todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks")
}
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre::Result<usize>> + 'static
{
// Acquire permit *before* spawning.
let _permit = {
OptionFuture::from(state.shared.new_worker_semaphore.as_ref().map(|sem| sem.acquire())).map(|opt| match opt {
Some(Err(e)) => {
#[cold]
#[inline(never)]
fn _panic_permit(e: impl std::fmt::Display) -> ! {
panic!("Failed to attempt to acquire walker permit: {}", e)
}
_panic_permit(e)
},
Some(Ok(p)) => Some(p),
None => None,
})
}.await;
use futures::prelude::*;
async move {
let backing_res = tokio::spawn(async move {
// Move permit into task once acquires.
let _permit = _permit;
// Number of *files* sent to tx from this iteration.
let mut counted = 0usize;
// `JoinHandle<eyre::Result<size>>`s to recursion children
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
let mut children = Vec::new();
todo!("counted += walk_directory(&state, &whence).await, etc...");
Ok((counted, children))
}).await.expect("Panic in backing walker thread");
match backing_res {
Ok((counted, children)) => {
Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().sum::<usize>())
},
Err(e) => Err(e),
}
}
}
#[inline]
pub fn start_walk(cfg: Config, whence: impl AsRef<Path>, whereto: mpsc::Sender<work::FileInfo>) -> impl Future<Output = eyre::Result<usize>> + 'static
{
use futures::prelude::*;
walk_inner(State {
current: UniqueState {
recursion_depth: cfg.recursion_depth,
},
shared: Arc::new(SharedState {
output_sender: whereto,
new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())),
config: cfg
}),
}, whence.as_ref().to_owned())
.flatten()
}