You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
227 lines
6.6 KiB
227 lines
6.6 KiB
//! Directory walking
|
|
use super::*;
|
|
use std::num::NonZeroUsize;
|
|
use std::{
|
|
sync::Arc,
|
|
path::{
|
|
Path, PathBuf,
|
|
},
|
|
};
|
|
use tokio::{
|
|
sync::{
|
|
RwLock,
|
|
Semaphore,
|
|
},
|
|
};
|
|
use futures::future::{
|
|
Future,
|
|
OptionFuture,
|
|
BoxFuture,
|
|
};
|
|
|
|
|
|
lazy_static! {
|
|
pub(super) static ref NUM_CPUS: usize = num_cpus::get();
|
|
}
|
|
|
|
/// Default number of max walkers allowed to work at once on the thread pool.
|
|
/// See `Config`.
|
|
#[inline]
|
|
pub fn default_max_walkers() -> Option<NonZeroUsize>
|
|
{
|
|
NonZeroUsize::new(*NUM_CPUS)
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
|
pub struct Config
|
|
{
|
|
// None: unlimited. 0: no recursion
|
|
pub recursion_depth: Option<usize>,
|
|
// None: Unlimited tasks.
|
|
pub max_walkers: Option<NonZeroUsize>,
|
|
}
|
|
|
|
impl Default for Config
|
|
{
|
|
#[inline]
|
|
fn default() -> Self
|
|
{
|
|
Self {
|
|
recursion_depth: Some(0),
|
|
max_walkers: default_max_walkers(),
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct UniqueState
|
|
{
|
|
// Decrease with each new depth until 0 is hit then do not proceed.
|
|
recursion_depth: Option<usize>,
|
|
output_sender: mpsc::Sender<work::FileInfo>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct SharedState
|
|
{
|
|
worker_config: Arc<work::Config>,
|
|
config: Config,
|
|
new_worker_semaphore: Option<Semaphore>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct State {
|
|
/// Shared state for all iterations
|
|
shared: Arc<SharedState>, //TODO: XXX: *another* Arc slowdown...
|
|
/// State for current iteration
|
|
current: UniqueState,
|
|
}
|
|
|
|
impl State
|
|
{
|
|
#[inline]
|
|
pub fn create_file_info_generator(&self) -> impl Fn(PathBuf, std::fs::Metadata) -> work::FileInfo + '_
|
|
{
|
|
work::FileInfo::factory(Arc::clone(&self.shared.worker_config)) //XXX: ANOTHER unneeded Arc clone..
|
|
}
|
|
#[inline(always)]
|
|
pub fn shared(&self) -> &SharedState
|
|
{
|
|
&self.shared
|
|
}
|
|
#[inline(always)]
|
|
pub fn unique(&self) -> &UniqueState
|
|
{
|
|
&self.current
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn unique_mut(&mut self) -> &mut UniqueState
|
|
{
|
|
&mut self.current
|
|
}
|
|
|
|
/// Create a `State` for a recursed child walk.
|
|
#[inline]
|
|
pub fn create_child(&self) -> Option<Self>
|
|
{
|
|
let recursion_depth = match self.current.recursion_depth.map(|depth| depth.saturating_sub(1)) {
|
|
// Prevent child creation if recursion bound hits 0.
|
|
Some(0) => return None,
|
|
x => x,
|
|
};
|
|
Some(Self {
|
|
shared: Arc::clone(&self.shared),
|
|
current: UniqueState { recursion_depth, output_sender: self.current.output_sender.clone() },
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Walk a single directory in the current task. Dispatch async results to `walk
|
|
async fn walk_directory<F>(state: &State, mut push_child: F, whence: impl AsRef<Path>) -> eyre::Result<usize>
|
|
where F: FnMut(State, PathBuf)
|
|
{
|
|
//use futures::future::FutureExt;
|
|
let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?;
|
|
let sender = &state.current.output_sender;
|
|
let file_info = state.create_file_info_generator();
|
|
let mut n = 0;
|
|
while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")?
|
|
{
|
|
let metadata = match fso.metadata().await {
|
|
Ok(metadata) => metadata,
|
|
Err(e) => {
|
|
error!("Failed to stat {:?}: {}", fso.file_name(), e);
|
|
continue;
|
|
}
|
|
};
|
|
if metadata.is_file() {
|
|
if let Err(_) = sender.send(file_info(fso.path(), metadata)).await {
|
|
warn!("Worker shut down, stopping iteration");
|
|
break;
|
|
} else {
|
|
n += 1;
|
|
}
|
|
} else if metadata.is_dir() {
|
|
let Some(state) = state.create_child() else { continue };
|
|
push_child(state, fso.path());
|
|
} // Ignore symlinks.
|
|
}
|
|
Ok(n)
|
|
//todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks")
|
|
}
|
|
|
|
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
|
|
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
|
|
async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result<usize>
|
|
{
|
|
|
|
use futures::prelude::*;
|
|
//async move
|
|
{
|
|
let backing_res = tokio::spawn(async move {
|
|
|
|
let _permit = match &state.shared.new_worker_semaphore {
|
|
Some(sem) => Some(sem.acquire().await.expect("Failed to acquire permit")),
|
|
None => None,
|
|
};
|
|
// `JoinHandle<eyre::Result<size>>`s to recursion children
|
|
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
|
|
let mut children = Vec::new();
|
|
// Number of *files* sent to tx from this iteration.
|
|
|
|
let counted = if whence.is_dir() {
|
|
walk_directory(&state, |state, whence| {
|
|
fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result<usize>>
|
|
{
|
|
walk_inner(state, whence).boxed()
|
|
}
|
|
children.push(walk_inner2(state, whence));
|
|
}, &whence).await?
|
|
} else {
|
|
let metadata = tokio::fs::metadata(&whence).await.wrap_err("Failed to stat top-level file")?;
|
|
if let Err(_) = state.current.output_sender.send(work::FileInfo::new(Arc::clone(&state.shared.worker_config), whence, metadata)).await {
|
|
return Err(eyre!("Failed to send top-level file to backing sorter: Sorter closed."));
|
|
}
|
|
1
|
|
};
|
|
|
|
Ok((counted, children))
|
|
}).await.expect("Panic in backing walker thread");
|
|
|
|
trace!("Spawning and collecting child workers");
|
|
match backing_res {
|
|
Ok((counted, children)) => {
|
|
use futures::prelude::*;
|
|
Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().filter_map(|res| match res {
|
|
Ok(n) => Some(n),
|
|
Err(e) => {
|
|
error!("Child failed to walk: {}", e);
|
|
None
|
|
},
|
|
}).sum::<usize>())
|
|
},
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
pub fn start_walk(cfg: Config, worker: Arc<work::Config>, whence: impl AsRef<Path>, whereto: mpsc::Sender<work::FileInfo>) -> impl Future<Output = eyre::Result<usize>> + 'static
|
|
{
|
|
use futures::prelude::*;
|
|
walk_inner(State {
|
|
current: UniqueState {
|
|
recursion_depth: cfg.recursion_depth,
|
|
output_sender: whereto,
|
|
},
|
|
shared: Arc::new(SharedState {
|
|
new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())),
|
|
config: cfg,
|
|
worker_config: worker,
|
|
}),
|
|
}, whence.as_ref().to_owned())
|
|
//.flatten()
|
|
}
|