works with allocateion

master
Avril 4 years ago
parent a68f418ffd
commit 5521799550
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -59,7 +59,10 @@ fn args_or_out<T: ExactSizeIterator>(i: T, low: usize) -> T
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
reyre!(init(), "Failed to initialise")?; reyre!(init(), "Failed to initialise")?;
reyre!(parallel::main(args_or_out(std::env::args(), 2).skip(1).dedup()).await, "Jobs failed") reyre!(parallel::main(futures::stream::iter(args_or_out(std::env::args(), 2)
.skip(1)
.dedup())).await,
"Jobs failed")
} }
#[cfg(not(feature="parallel"))] #[cfg(not(feature="parallel"))]

@ -5,13 +5,19 @@ use std::{
convert::{TryFrom, TryInto,}, convert::{TryFrom, TryInto,},
path::Path, path::Path,
sync::Arc, sync::Arc,
iter,
}; };
use futures::{ use futures::{
future::{ future::{
Future,
OptionFuture, OptionFuture,
FutureExt, FutureExt,
join_all, join_all,
}, },
stream::{
Stream,
StreamExt,
},
}; };
use tokio::{ use tokio::{
sync::{ sync::{
@ -83,16 +89,23 @@ async fn work<P: AsRef<Path>>(apath: P, sem: Option<Arc<Semaphore>>) -> Result<(
} }
} }
pub async fn main<I: IntoIterator<Item=String>>(list: I) -> eyre::Result<()> async fn join_stream<I: Stream>(stream: I) -> impl Iterator<Item=<I::Item as Future>::Output> + ExactSizeIterator
where I::Item: Future
{
//gotta be a better way than heap allocating here, right?
stream.then(|x| async move { x.await }).collect::<Vec<_>>().await.into_iter()
}
pub async fn main<I: Stream<Item=String>>(list: I) -> eyre::Result<()>
{ {
let sem = gensem(); let sem = gensem();
let list = list.into_iter(); //let list = list.into_iter();
let mut failures = match list.size_hint() { let mut failures = match list.size_hint() {
(0, Some(0)) | (0, None) => Vec::new(), (0, Some(0)) | (0, None) => Vec::new(),
(x, None) | (_, Some(x)) => Vec::with_capacity(x), (x, None) | (_, Some(x)) => Vec::with_capacity(x),
}; };
let mut done = 0usize; let mut done = 0usize;
for (i, res) in (0usize..).zip(join_all(list.map(|file| tokio::spawn(work(file, sem.clone())))) for (i, res) in (0usize..).zip(join_stream(list.map(|file| tokio::spawn(work(file, sem.clone()))))
.map(|x| {trace!("--- {} Finished ---", x.len()); x}).await) .map(|x| {trace!("--- {} Finished ---", x.len()); x}).await)
{ {
//trace!("Done on {:?}", res); //trace!("Done on {:?}", res);

Loading…
Cancel
Save