You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
leanify-many/src/process.rs

157 lines
3.5 KiB

//! Handles spawning the process
use super::*;
use tokio::{
process::{
Command,
},
sync::{
mpsc,
},
io::{
BufReader,
},
stream::StreamExt,
prelude::*,
task,
};
use std::{
io,
path::{
Path,
PathBuf,
},
};
/// Process information
pub struct Process
{
name: PathBuf,
args: flags::LeanifyFlags,
}
impl Process
{
#[inline] pub fn new(name: impl Into<PathBuf>, args: flags::LeanifyFlags) -> Self
{
Self {
name: name.into(),
args: args
}
}
}
impl AsRef<Path> for Process
{
fn as_ref(&self) -> &Path
{
Path::new(&self.name)
}
}
/// Spawn the process, and contain its standard output.
///
/// # Notes
/// Standard error is printed immediately instead.
pub async fn contained_spawn<T,U,V>(process: T, args: U, mut output_to: mpsc::Sender<(bool, String)>) -> Result<(), Error>
where U: IntoIterator<Item=V>,
V: AsRef<std::ffi::OsStr>,
T: AsRef<Process>
{
let Process{
name: process,
args: process_args
} = process.as_ref();
cfg_if!{
if #[cfg(any(feature="progress",feature="collect_err"))] {
let stderr = std::process::Stdio::piped();
} else {
let stderr = std::process::Stdio::inherit();
}
};
let mut child = match Command::new(process)
.args(process_args.iter_cloned().map(|x| x.into_owned())) //Do we need `into_owned()` here?
.args(args)
.stdout(std::process::Stdio::piped())
.stderr(stderr)
.stdin(std::process::Stdio::null())
.spawn() {
Ok(chi) => chi,
Err(sp) => {
return Err(Error::Spawning(sp));
}
};
let stdout = child.stdout.take().unwrap();
#[cfg(any(feature="collect_err", feature="progress"))] let stderr_sender = {
let stderr = child.stderr.take().unwrap();
let mut output = output_to.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Some(Ok(line)) = lines.next().await {
if let Err(_) = output.send((true,line)).await {
break;
}
}
})
};
let sender = tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Some(Ok(line)) = lines.next().await {
if let Err(_) = output_to.send((false, line)).await {
break;
}
}
});
task::yield_now().await;
match child.await {
Ok(exit) => {
if exit.success() {
cfg_if!{
if #[cfg(any(feature="collect_err",feature="progress"))] {
let (o1, o2) = futures::future::join(
sender,
stderr_sender
).await;
o1.expect("Child (stdout) panic");
o2.expect("Child (stderr) panic");
}
else {
sender.await.expect("Child panic");
}
};
Ok(())
} else {
Err(Error::Process(exit.code()))
}
},
Err(err) => Err(Error::Transmission(err)),
}
}
#[derive(Debug)]
pub enum Error {
/// There was an error spawning the process
Spawning(io::Error),
/// Process exited with non-zero error code.
Process(Option<i32>),
/// Error communicating with process.
Transmission(io::Error),
}
impl std::error::Error for Error{}
impl std::fmt::Display for Error
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result
{
match self {
Self::Spawning(io) => write!(f, "there was an error spawning the process: {}", io),
Self::Transmission(io) => write!(f, "there was an error waiting on child process: {}", io),
Self::Process(Some(sig)) => write!(f, "process exited with non-zero code: {}", sig),
Self::Process(None) => write!(f, "process teminated by signal"),
}
}
}