Added async path walker (input can come from stdin at any time, or from args immediately.)

Started mod `args` for parsing arguments.

Fortune for enumerate-ordered's current commit: Future small blessing − 末小吉
master
Avril 2 years ago
parent ba2680db8e
commit e156e2cc1e
Signed by: flanchan
GPG Key ID: 284488987C31F630

28
Cargo.lock generated

@ -101,6 +101,7 @@ version = "0.1.0"
dependencies = [
"color-eyre",
"futures",
"jemallocator",
"log",
"pretty_env_logger",
"tokio",
@ -130,6 +131,12 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]]
name = "futures"
version = "0.3.25"
@ -249,6 +256,27 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]]
name = "jemalloc-sys"
version = "0.5.2+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134163979b6eed9564c98637b710b40979939ba351f59952708234ea11b5f3f8"
dependencies = [
"cc",
"fs_extra",
"libc",
]
[[package]]
name = "jemallocator"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6"
dependencies = [
"jemalloc-sys",
"libc",
]
[[package]]
name = "libc"
version = "0.2.137"

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
color-eyre = { version = "0.6.2", default-features = false }
futures = "0.3.25"
jemallocator = "0.5.0"
log = "0.4.17"
pretty_env_logger = "0.4.0"
tokio = { version = "1.22.0", features = ["full"] }

@ -0,0 +1,84 @@
//! Arg parsing
use super::*;
use std::{
ffi::OsStr,
path::{
Path, PathBuf,
},
borrow::Cow,
};
use tokio::{
sync::{
mpsc,
},
};
use futures::{
stream::{self, Stream, BoxStream, StreamExt,},
};
/// Parsed command-line args
#[derive(Debug)]
pub struct Args
{
paths: Option<Vec<PathBuf>>,
}
impl Args
{
/// Paths as an async stream
///
/// # Non-immediate
/// When input paths come from `stdin`, the output stream will be non-immediate.
pub fn paths(&self) -> BoxStream<'_, Cow<'_, Path>>
{
if let Some(paths) = self.paths.as_ref() {
stream::iter(paths.iter().map(|x| Cow::Borrowed(Path::new(x)))).boxed()
} else {
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
use tokio::io::{
self,
AsyncReadExt, AsyncBufReadExt
};
let mut stdin = {
tokio::io::BufReader::new(io::stdin())
};
let mut buf = Vec::with_capacity(1024);
loop
{
buf.clear();
use std::os::unix::prelude::*;
let n = match stdin.read_until(b'\n', &mut buf).await {
Ok(n) => n,
Err(e) => {
error!("paths: Failed to read input line: {}", e);
break;
},
};
if n == 0 {
trace!("paths: Stdin exhausted. Stopping read.");
break;
}
let path_bytes = &buf[..n];
let path = Path::new(OsStr::from_bytes(path_bytes));
if path.exists() {
if tx.send(path.to_owned()).await.is_err() {
trace!("paths: Stream dropped, cancelling stdin read.");
break;
}
}
}
});
tokio_stream::wrappers::ReceiverStream::new(rx).map(|x| Cow::Owned(x)).boxed()
}
}
}
#[inline]
pub fn parse_args() -> eyre::Result<Args>
{
todo!("parse(std::env::args().skip(1))")
}
//TODO: fn parse(args: impl IntoIterator<Item=String>) -> eyre::Result<Args>

@ -16,6 +16,12 @@ use tokio::{
},
};
use jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
mod args;
mod order;
mod work;
mod walk;
@ -40,6 +46,16 @@ async fn work_on(cfg: work::Config, mut into: mpsc::Receiver<work::FileInfo>) ->
Ok(map)
}
async fn walk_paths<I, P>(paths: I, cfg: walk::Config, worker_cfg: &std::sync::Arc<work::Config>, to: mpsc::Sender<work::FileInfo>) -> eyre::Result<usize>
where I: futures::stream::Stream<Item = P>,
P: AsRef<std::path::Path>
{
use futures::prelude::*;
let children: Vec<usize> =
paths.map(|path| futures::stream::once(walk::start_walk(cfg.clone(), std::sync::Arc::clone(worker_cfg), path, to.clone())).boxed_local()).flatten_unordered(None).try_collect().await?;
Ok(children.into_iter().sum())
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
@ -47,18 +63,56 @@ async fn main() -> eyre::Result<()> {
let (tx, rx) = mpsc::channel(4096);
//TODO: Read main config from args
let args = args::parse_args()
.wrap_err("Failed to parse command line args")
.with_suggestion(|| "Try `--help`")?;
let worker_cfg = {
//TODO: Read worker config from main config
std::sync::Arc::new(work::Config::default())
};
let walker_cfg = {
//TODO: Read walker config from main config
walk::Config::default()
};
// Spin up ordering task.
let ordering = tokio::spawn(async move {
trace!("spun up ordering backing thread");
work_on(Default::default(), rx).await //TODO: Parse config from args
});
let ordering = {
let cfg = (*worker_cfg).clone();
tokio::spawn(async move {
trace!("spun up ordering backing thread");
work_on(cfg, rx).await //TODO: Parse config from args
})
};
trace!("Starting recursive walk of input locations");
//TODO: Trace directory trees from paths in `args` and/or `stdin` and pass results to `tx`
let set = ordering.await.wrap_err("Ordering task panic")?
.wrap_err(eyre!("Failed to collect ordered files"))?;
let walk = walk_paths(args.paths(), walker_cfg, &worker_cfg, tx);
tokio::pin!(walk);
let set = async move {
ordering.await.wrap_err("Ordering task panic")?
.wrap_err(eyre!("Failed to collect ordered files"))
};
tokio::pin!(set);
/*let set = tokio::select! {
n = walk => {
let n =n.wrap_err("Walker failed")?;
info!("Walked {} files", n);
},
res = set => {
let set = res.wrap_err("Ordering task failed before walk completed")?;
return Err(eyre!("Ordering task exited before walker task"));
}
};*/
let (walk, set) = {
let (w, s) = tokio::join!(walk, set);
(w?, s?)
};
info!("Walked {} files", walk);
tokio::task::spawn_blocking(move || -> eyre::Result<()> {
use std::io::Write;
@ -67,7 +121,7 @@ async fn main() -> eyre::Result<()> {
let lock = std::io::stdout().lock();
std::io::BufWriter::new(lock)
};
trace!("Writing ordered results to stdout... (buffered)");
for info in set.into_iter()
{
stdout.write_all(info.path().as_os_str().as_bytes())

@ -94,7 +94,7 @@ impl State
/// Walk a single directory in the current task. Dispatch async results to `walk
async fn walk_directory<F>(state: &State, mut push_child: F, whence: impl AsRef<Path>) -> eyre::Result<usize>
where F: FnMut(State, PathBuf)
where F: FnMut(State, PathBuf)
{
//use futures::future::FutureExt;
let mut read = tokio::fs::read_dir(whence).await.wrap_err("Failed to read-dir")?;
@ -124,44 +124,44 @@ async fn walk_directory<F>(state: &State, mut push_child: F, whence: impl AsRef<
/// This function is called recursively for each subdirectory in `whence` pertaining to the recursion rules.
/// The function should await all its spawned children *after* finishing its own work on files, **and** dropping its semaphore `_permit`. Otherwise deadlock could occour easily.
async fn walk_inner(state: State, whence: PathBuf) -> impl Future<Output = eyre::Result<usize>> + Send + Sync + 'static
async fn walk_inner(state: State, whence: PathBuf) -> eyre::Result<usize>
{
// Acquire permit *before* spawning.
let _permit = {
OptionFuture::from(state.shared.new_worker_semaphore.as_ref().map(|sem| sem.acquire())).map(|opt| match opt {
Some(Err(e)) => {
#[cold]
#[inline(never)]
fn _panic_permit(e: impl std::fmt::Display) -> ! {
panic!("Failed to attempt to acquire walker permit: {}", e)
}
_panic_permit(e)
},
Some(Ok(p)) => Some(p),
None => None,
})
}.await;
use futures::prelude::*;
async move {
//async move
{
let backing_res = tokio::spawn(async move {
// Move permit into task once acquires.
let _permit = _permit;
// Number of *files* sent to tx from this iteration.
let mut counted = 0usize;
let _permit = match &state.shared.new_worker_semaphore {
Some(sem) => Some(sem.acquire().await.expect("Failed to acquire permit")),
None => None,
};
// `JoinHandle<eyre::Result<size>>`s to recursion children
// XXX: Maybe use mpsc for this instead: We send the JoinHandle's to a rx being looped on in a `join!` in the outer `async move {}` scope at the same time as this one. When the sender is dropped, the channel will close. We can join each child of the stream concurrently with `futures` (probably?) and bubble up panics when they are found.
let mut children = Vec::new();
let todo_res = walk_directory(&state, |state, whence| {
children.push(async move { walk_inner(state, whence).await }.boxed());
}, &whence).await;
// Number of *files* sent to tx from this iteration.
let counted = walk_directory(&state, |state, whence| {
fn walk_inner2(state: State, whence: PathBuf) -> BoxFuture<'static, eyre::Result<usize>>
{
walk_inner(state, whence).boxed()
}
children.push(walk_inner2(state, whence));
}, &whence).await?;
todo!("counted += walk_directory(&state, &whence).await, etc...");
Ok((counted, children))
}).await.expect("Panic in backing walker thread");
trace!("Spawning and collecting child workers");
match backing_res {
Ok((counted, children)) => {
Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().sum::<usize>())
use futures::prelude::*;
Ok(counted + futures::future::join_all(children.into_iter()).await.into_iter().filter_map(|res| match res {
Ok(n) => Some(n),
Err(e) => {
error!("Child failed to walk: {}", e);
None
},
}).sum::<usize>())
},
Err(e) => Err(e),
}
@ -183,5 +183,5 @@ pub fn start_walk(cfg: Config, worker: Arc<work::Config>, whence: impl AsRef<Pat
worker_config: worker,
}),
}, whence.as_ref().to_owned())
.flatten()
//.flatten()
}

@ -19,7 +19,7 @@ pub enum OrderBy
ModifiedTime,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct Config
{
pub by: OrderBy,

Loading…
Cancel
Save