diff --git a/src/main.rs b/src/main.rs index bc242ff..7650469 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,7 +59,10 @@ fn args_or_out(i: T, low: usize) -> T async fn main() -> eyre::Result<()> { reyre!(init(), "Failed to initialise")?; - reyre!(parallel::main(args_or_out(std::env::args(), 2).skip(1).dedup()).await, "Jobs failed") + reyre!(parallel::main(futures::stream::iter(args_or_out(std::env::args(), 2) + .skip(1) + .dedup())).await, + "Jobs failed") } #[cfg(not(feature="parallel"))] diff --git a/src/parallel.rs b/src/parallel.rs index 79925f8..04405b5 100644 --- a/src/parallel.rs +++ b/src/parallel.rs @@ -5,13 +5,19 @@ use std::{ convert::{TryFrom, TryInto,}, path::Path, sync::Arc, + iter, }; use futures::{ future::{ + Future, OptionFuture, FutureExt, join_all, }, + stream::{ + Stream, + StreamExt, + }, }; use tokio::{ sync::{ @@ -83,16 +89,23 @@ async fn work>(apath: P, sem: Option>) -> Result<( } } -pub async fn main>(list: I) -> eyre::Result<()> +async fn join_stream(stream: I) -> impl Iterator::Output> + ExactSizeIterator + where I::Item: Future +{ + //gotta be a better way than heap allocating here, right? + stream.then(|x| async move { x.await }).collect::>().await.into_iter() +} + +pub async fn main>(list: I) -> eyre::Result<()> { let sem = gensem(); - let list = list.into_iter(); + //let list = list.into_iter(); let mut failures = match list.size_hint() { (0, Some(0)) | (0, None) => Vec::new(), (x, None) | (_, Some(x)) => Vec::with_capacity(x), }; let mut done = 0usize; - for (i, res) in (0usize..).zip(join_all(list.map(|file| tokio::spawn(work(file, sem.clone())))) + for (i, res) in (0usize..).zip(join_stream(list.map(|file| tokio::spawn(work(file, sem.clone())))) .map(|x| {trace!("--- {} Finished ---", x.len()); x}).await) { //trace!("Done on {:?}", res);