diff --git a/Cargo.lock b/Cargo.lock index 809c7b2..e316571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 4 + [[package]] name = "arc-swap" version = "0.4.7" @@ -59,9 +61,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -74,9 +76,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -84,15 +86,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -101,42 +103,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ - "proc-macro-hack", "proc-macro2", "quote", - "syn", + "syn 2.0.100", ] [[package]] name = "futures-sink" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" -dependencies = [ - "once_cell", -] +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.5" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -145,10 +143,8 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project", + "pin-project-lite 0.2.16", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -307,12 +303,6 @@ dependencies = [ "libc", ] -[[package]] -name = "once_cell" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" - [[package]] name = "pin-project" version = "0.4.23" @@ -330,7 +320,7 @@ checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.38", ] [[package]] @@ -340,37 +330,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715" [[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - -[[package]] -name = "proc-macro-hack" -version = "0.5.18" +name = "pin-project-lite" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] -name = "proc-macro-nested" -version = "0.1.6" +name = "pin-utils" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "proc-macro2" -version = "1.0.19" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] name = "quote" -version = "1.0.7" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -455,6 +439,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "syn" +version = "2.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "terminal_size" version = "0.1.13" @@ -491,8 +486,9 @@ dependencies = [ "mio-named-pipes", "mio-uds", "num_cpus", - "pin-project-lite", + "pin-project-lite 0.1.7", "signal-hook-registry", + "slab", "tokio-macros", "winapi 0.3.9", ] @@ -505,9 +501,15 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.38", ] +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + [[package]] name = "unicode-xid" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index c7c3daa..d594ebd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ collect_err = [] [dependencies] lazy_static = "1.4" -tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream"]} +tokio = {version = "0.2", features= ["rt-core", "rt-util", "macros", "fs", "io-driver", "io-util", "io-std", "process", "sync", "stream", "time"]} futures = "0.3" termprogress = {version="0.3", optional=true} cfg-if = "0.1" diff --git a/src/main.rs b/src/main.rs index 1f68a92..40ce465 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,7 +59,7 @@ async fn work() -> Result<(), Box<dyn std::error::Error>> } }, _ => args.max_children, - }, tokio::signal::ctrl_c().boxed()).await + }, tokio::spawn(tokio::signal::ctrl_c().fuse())).await } diff --git a/src/process.rs b/src/process.rs index 9b74a3c..09b6001 100644 --- a/src/process.rs +++ b/src/process.rs @@ -69,13 +69,14 @@ where U: IntoIterator<Item=V>, let stderr = std::process::Stdio::inherit(); } }; - let mut child = match Command::new(process) .args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here? - .args(args) + .args(args.into_iter()) .stdout(std::process::Stdio::piped()) .stderr(stderr) .stdin(std::process::Stdio::null()) + .kill_on_drop(false) +//.process_group(0) //XXX: It's this that is propagating the SIGINT!!! .spawn() { Ok(chi) => chi, Err(sp) => { @@ -107,7 +108,7 @@ where U: IntoIterator<Item=V>, } }); let _ = task::yield_now().await; - match child.await { + match tokio::spawn(child).await.expect("Child supervisor panic") { Ok(exit) => { if exit.success() { cfg_if!{ diff --git a/src/progress.rs b/src/progress.rs index 510f7e5..3a741ff 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -519,6 +519,7 @@ pub fn create_progress<P: ProgressBar + WithTitle + Send + 'static, progress.complete(); }).await.map_err(|_| Error::WorkerPanic); shutdown_tx.broadcast(Some(res.clone())).expect("Failed to communicate worker shutdown with waiters"); + res }); handle @@ -542,6 +543,22 @@ pub enum Error Unknown, } +impl Error +{ + /// If this error comes from an attempt to communicate with the worker that failed but can be ignored. + /// + /// # Usage + /// If the worker has been gracefully shutdown, and background tasks are still running that want to update the progress, an error will be returned when those already extant tasks attempt to send the commands to do so. + /// Those tasks can ignore those errors entirely if this function returns `true`. + pub fn can_ignore_on_com_send_failure(&self) -> bool + { + match self { + Self::WorkerDropped => true, + _ => false, + } + } +} + impl std::error::Error for Error{} impl std::fmt::Display for Error { diff --git a/src/work.rs b/src/work.rs index 55d31fb..57c8781 100644 --- a/src/work.rs +++ b/src/work.rs @@ -91,6 +91,7 @@ async fn do_work(process: impl AsRef<Process>, file: impl AsRef<OsStr>, mut prog }; let _ = tokio::task::yield_now().await; //let _ = opt_await.await; + match process::contained_spawn(process, std::iter::once(file), tx).await { Ok(_) => Ok(collector.await.expect("Child panic").into_iter()), Err(error) => { @@ -127,9 +128,9 @@ where I: IntoIterator<Item=T>, }; /// Make a future cancellable by the passed `cancel` token. macro_rules! pass_cancel { - ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); + ($a:expr) => (::futures::future::Abortable::new($a, cancel_register)); } - + let (tx,mut rx) = mpsc::channel::<(T, fixed_stack::IntoIter<(bool, String)>, usize)>(children.as_ref().map(|&x| usize::from(x)).unwrap_or(16)); let semaphore = children.map(|children| Arc::new(Semaphore::new(children.into()))); let process = Arc::new(Process::new(process, flags.leanify_flags.clone())); @@ -179,11 +180,12 @@ where I: IntoIterator<Item=T>, let results = pass_cancel!(join_all( files - .map(|filename| { + .map(|filename| { + let cancel = cancel.clone(); + let semaphore = semaphore.clone(); let process = Arc::clone(&process); let mut tx = tx.clone(); - let cancel = cancel.clone(); #[cfg(feature="progress")] let mut progress = progress.clone(); @@ -193,7 +195,14 @@ where I: IntoIterator<Item=T>, #[cfg(feature="progress")] type Opt<T> = OptionFuture<T>; #[cfg(not(feature="progress"))] type Opt<T> = std::marker::PhantomData<T>; let _task_id: Opt<_> = { + + if cancel.is_aborted() { + return; + } let _lock = maybe_await(semaphore.map(|x| x.acquire_owned())).await; + if cancel.is_aborted() { + return; + } // (task_id, worker_result) let worker = { @@ -223,7 +232,10 @@ where I: IntoIterator<Item=T>, #[cfg(not(feature="progress"))] eprintln!("[{}] {:?}: {}", colour::style(colour!(Color::Yellow),"!"), filename.as_ref(), colour::style(colour!(Color::Yellow), error)); #[cfg(feature="progress")] let _ = progress.eprintln(format!("[{}] ({}) {:?}: {}", colour::style(colour!(Color::Yellow),"!"),i, filename.as_ref(), colour::style(colour!(Color::Yellow), error))).await .or_else(|e| { - eprintln!("\n{}",e); Err(e) + if !e.can_ignore_on_com_send_failure() { + eprintln!("\n{}",e); + } + Err(e) }); }, } @@ -253,10 +265,10 @@ where I: IntoIterator<Item=T>, }),i+=1).0 }))).await .map(|x| x - .into_iter() - .filter_map(|x| x.err())); + .into_iter() + .filter_map(|x| x.err())); - let results = match results { + let results: Box<dyn Iterator<Item = tokio::task::JoinError>> = match results { Err(_) => { #[cfg(feature="progress")] { progress.eprintln("[!] Child aborting...").await?.await?; @@ -264,17 +276,19 @@ where I: IntoIterator<Item=T>, } #[cfg(not(feature="progress"))] eprintln!("[!] Child aborting..."); - todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); + //todo!("XXX: How to actually implement this? Add global mutexed counter in `Process` itself to track them and optionally await on them? We have prevented any more from spawning, but how do we wait for the ones that already are (which is the whole point of this.)"); + // TODO: How to impl: Make a sparse-map of `usize`-keyed `MappedData<[AtomicRefCell<MaybeUninit<T>>]> :: get_async(Process::next_uniq_id()) -> Pin<&mut SparseEntry<'_, T>>`, which keys are stored in a `std::bitset<usize>` analogue + Box::new(std::iter::empty()) }, - Ok(v) => v, + Ok(v) => Box::new(v), }; - #[cfg(feature="progress")] progress.shutdown().await?; for failed in results { #[cfg(feature="progress")] progress.eprintln(format!("[e] Child panic {:?}", failed)).await?.await?; #[cfg(not(feature="progress"))] eprintln!("[e] Child panic {:?}", failed); } + #[cfg(feature="progress")] progress.shutdown().await?; drop(tx); display.await?;