Added walk::walk_directory() impl

Fortune for enumerate-ordered's current commit: Future blessing − 末吉
master
Avril 1 year ago
parent 6f4ef18b0b
commit ba2680db8e
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -16,6 +16,7 @@ use tokio::{
use futures::future::{
Future,
OptionFuture,
BoxFuture,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
@ -27,18 +28,19 @@ pub struct Config
pub max_walkers: Option<NonZeroUsize>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone)]
struct UniqueState
{
// Decrease with each new depth until 0 is hit then do not proceed.
recursion_depth: Option<usize>,
output_sender: mpsc::Sender<work::FileInfo>,
}
#[derive(Debug)]
struct SharedState
{
worker_config: Arc<work::Config>,
config: Config,
output_sender: mpsc::Sender<work::FileInfo>,
new_worker_semaphore: Option<Semaphore>,
}
@ -50,15 +52,79 @@ struct State {
current: UniqueState,
}
impl State
{
#[inline]
pub fn create_file_info_generator(&self) -> impl Fn(PathBuf, std::fs::Metadata) -> work::FileInfo + '_
{
work::FileInfo::factory(Arc::clone(&self.shared.worker_config)) //XXX: ANOTHER unneeded Arc clone..
}
#[inline(always)]
pub fn shared(&self) -> &SharedState
{
&self.shared
}
#[inline(always)]
pub fn unique(&self) -> &UniqueState
{
&self.current
}
#[inline(always)]
pub fn unique_mut(&mut self) -> &mut UniqueState
{
&mut self.current
}
/// Create a `State` for a recursed child walk.
#[inline]
pub fn create_child(&self) -> Option<Self>
{
let recursion_depth = match self.current.recursion_depth.map(|depth| depth.saturating_sub(1)) {
// Prevent child creation if recursion bound hits 0.
Some(0) => return None,
x => x,
};
Some(Self {
shared: Arc::clone(&self.shared),
current: UniqueState { recursion_depth, output_sender: self.current.output_sender.clone() },
})
}
}
/// Walk a single directory in the current task. Dispatch async results to `walk
async fn walk_directory(state: &State, whence: impl AsRef<Path>) -> eyre::Result<usize>
async fn walk_directory<F>(state: &State, mut push_child: F, whence: impl AsRef<Path>) -> eyre::Result<usize>
where F: FnMut(State, PathBuf)
{
//use futures::future::FutureExt;
let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?;
let sender = &state.current.output_sender;
let file_info = state.create_file_info_generator();
while let Some(fso) = read.next_entry().await.wrap_err("Failed to enumerate directory entry")?
{
let metadata = match fso.metadata().await {
Ok(metadata) => metadata,
Err(e) => {
error!("Failed to stat {:?}: {}", fso.file_name(), e);
continue;
}
};
if metadata.is_file() {
if let Err(_) = sender.send(file_info(fso.path(), metadata)).await {
warn!("Worker shut down, stopping iteration");
break;
}
} else if metadata.is_dir() {
let Some(state) = state.create_child() else { continue };
push_child(state, fso.path());
} // Ignore symlinks.
}
todo!("Walk to directory and output its files into `state`'s tx XXX: Does this function need to exist? We could just do this in walk_inner() directly: Explicit boxing doesn't need to be done as we're working with joinhandles and backing tasks")
}
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre::Result<usize>> + 'static
async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre::Result<usize>> + Send + Sync + 'static
{
// Acquire permit *before* spawning.
let _permit = {
@ -85,6 +151,9 @@ async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre:
// `JoinHandle<eyre::Result<size>>`s to recursion children
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
let mut children = Vec::new();
let todo_res = walk_directory(&state, |state, whence| {
children.push(async move { walk_inner(state, whence).await }.boxed());
}, &whence).await;
todo!("counted += walk_directory(&state, &whence).await, etc...");
Ok((counted, children))
@ -100,17 +169,18 @@ async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre:
}
#[inline]
pub fn start_walk(cfg: Config, whence: impl AsRef<Path>, whereto: mpsc::Sender<work::FileInfo>) -> impl Future<Output = eyre::Result<usize>> + 'static
pub fn start_walk(cfg: Config, worker: Arc<work::Config>, whence: impl AsRef<Path>, whereto: mpsc::Sender<work::FileInfo>) -> impl Future<Output = eyre::Result<usize>> + 'static
{
use futures::prelude::*;
walk_inner(State {
current: UniqueState {
recursion_depth: cfg.recursion_depth,
output_sender: whereto,
},
shared: Arc::new(SharedState {
output_sender: whereto,
new_worker_semaphore: cfg.max_walkers.as_ref().map(|max| Semaphore::new(max.get())),
config: cfg
config: cfg,
worker_config: worker,
}),
}, whence.as_ref().to_owned())
.flatten()

Loading…
Cancel
Save