use super::*; use std::{ num::NonZeroUsize, sync::Arc, pin::Pin, marker::{ Send, }, }; use tokio::{ prelude::*, stream::StreamExt, sync::{ Semaphore, }, }; use futures::future::{ join_all, Future, }; pub async fn maybe_await(from: Option) -> Option<::Output> where T: Future { if let Some(v) = from { Some(v.await) } else { None } } pub async fn do_work(process: impl AsRef, file: impl AsRef) { let process = process.as_ref(); let file = file.as_ref(); match process::contained_spawn(process, std::iter::once(file)).await { Ok(output) => { }, Err(process::Error::Spawning) => { }, Err(process::Error::Process) => { }, } } pub async fn work(process: String, files: I, children: Option) -> Result<(), Box> where I: IntoIterator, T: AsRef + Send + 'static { //let mut stage: stage::Stage = files.into_iter().map(|x| x.into()).collect(); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(process); join_all(files.into_iter() .map(|filename| { let semaphore = semaphore.clone(); let process = Arc::clone(&process); tokio::spawn(async move { let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; do_work(&process[..], filename).await; }) })).await; Ok(()) }