diff --git a/.gitignore b/.gitignore index e2a3069..615f82e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target *~ +test*.txt diff --git a/src/arg/mod.rs b/src/arg/mod.rs index 6e3970c..f2837d1 100644 --- a/src/arg/mod.rs +++ b/src/arg/mod.rs @@ -70,6 +70,11 @@ impl Operation } } + pub fn tty(&self) -> bool + { + self.output_to_tty + } + pub fn parent(&self) -> &ProcessArgs { &self.input diff --git a/src/arg/proc_args.rs b/src/arg/proc_args.rs index 15e0dd9..efd12b8 100644 --- a/src/arg/proc_args.rs +++ b/src/arg/proc_args.rs @@ -64,12 +64,12 @@ impl ProcessArgs fn find_somewhere(name: T, places: U) -> Option where T: AsRef, - U: Iterator, + U: IntoIterator, V: AsRef { let name = name.as_ref(); - for place in places + for place in places.into_iter() { let place = place.as_ref(); diff --git a/src/main.rs b/src/main.rs index b654154..08050e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,15 +3,56 @@ use std::{ error::Error, + process::{Command, Stdio, Child, ChildStdin, ChildStdout,}, + io::{self, Read, Write,}, }; mod arg; +mod multicast; +mod pipe; + +fn link<'a>(oneesan: &'a mut Child, imouto: &'a mut Vec) -> Result, &'static str> +{ + let pipe_read = oneesan.stdout.as_mut().ok_or("Failed to open oneesan stdout")?; + let mut pipe_write = multicast::MulticastStream::new(); + + for write in imouto.iter_mut() { + pipe_write.cast(write.stdin.as_mut().ok_or("Failed to open imouto stdin")?); + } + + Ok(pipe::LeakyPipe::new(pipe_read, pipe_write)) +} fn main() -> Result<(), Box> { let op = arg::parse()?; - //TODO: Open input - println!("{:?}", op); + let parent = op.parent(); + let children = op.children(); + + let mut oneesan = Command::new(parent.name_path()?) + .args(&parent.args) + .stdout(Stdio::piped()) + .stdin(Stdio::null()) + .spawn()?; + + let mut imouto = Vec::with_capacity(children.len()); + for child in children.iter() + { + imouto.push(Command::new(child.name_path()?) + .args(&child.args) + .stdin(Stdio::piped()) + .stdout(Stdio::inherit()) + .spawn()?); + } + + let leak = link(&mut oneesan, &mut imouto)?; + + leak.pipe(1024, op.tty())?; + + oneesan.wait()?; + for mut i in imouto.into_iter() { + i.wait()?; + } Ok(()) } diff --git a/src/multicast.rs b/src/multicast.rs new file mode 100644 index 0000000..285baee --- /dev/null +++ b/src/multicast.rs @@ -0,0 +1,74 @@ +use std::{ + io::{self, Read, Write,}, +}; + +pub struct MulticastStream +where T: Write +{ + outputs: Vec, + continue_on_fail: bool, +} + +impl MulticastStream +where T: Write +{ + pub fn new() -> Self { + Self { + outputs: Vec::new(), + continue_on_fail: false, + } + } + pub fn continue_on_fail(self, set: bool) -> Self { + Self { + continue_on_fail: set, + ..self + } + } + pub fn cast(&mut self, output: T) { + self.outputs.push(output); + } +} + +impl Write for MulticastStream +where T: Write +{ + fn write(&mut self, buf: &[u8]) -> io::Result + { + let mut sz =0; + let mut one_ok = self.outputs.len() < 1; + for res in self.outputs.iter_mut().map(|output| output.write(&buf)) { + match res { + Ok(res) if res > sz => sz = res, + Err(err) if !self.continue_on_fail => return Err(err), + Err(_) => continue, + _ => (), + }; + one_ok = true; + } + + if !one_ok { + Err(io::Error::new(io::ErrorKind::UnexpectedEof, "All write streams failed")) + } else { + Ok(sz) + } + } + + fn flush(&mut self) -> io::Result<()> + { + let mut errors = 0; + for res in self.outputs.iter_mut().map(|output| output.flush()) { + if let Err(err) = res { + if !self.continue_on_fail { + return Err(err); + } + errors += 1; + } + } + + if errors > 0 && errors == self.outputs.len() { + Err(io::Error::new(io::ErrorKind::UnexpectedEof, "All write streams failed")) + } else { + Ok(()) + } + } +} diff --git a/src/pipe/error.rs b/src/pipe/error.rs new file mode 100644 index 0000000..2fbf012 --- /dev/null +++ b/src/pipe/error.rs @@ -0,0 +1,157 @@ +use super::*; +use std::{ + io::{self,}, + fmt, + error, + num::NonZeroUsize, +}; + +#[derive(Debug)] +pub enum ErrorLocation { + Oneesan, + Imouto, + Internal, +} + +#[derive(Debug)] +pub struct Error { + explination: String, + bytes_done: Option, + location: ErrorLocation, + inner: Option, +} + +fn clamp(value: i64, min: i64, max: i64) -> i64 +{ + if value < min { + min + } else if value > max { + max + } else { + value + } +} + +fn human_bytes(bytes: usize) -> String +{ + const POSTFIXES: [&'static str; 7] = ["b", "kb", "mb", "gb", "tb", "pb", "eb"]; + + if bytes == 0 { + return format!("0 {}", POSTFIXES[0]); + } + + let idx = (bytes as f64).log(1024f64).floor() as i64; + let suf = &POSTFIXES[clamp(idx, 0, 6) as usize]; + let num = (1024_i64).pow(idx as u32); + + format!("{:.2} {}", num, suf) +} + +impl Error { + pub fn new(expl: String, from: ErrorLocation, bytes_done: usize) -> Self { + Self { + explination: expl, + location: from, + inner: None, + bytes_done: NonZeroUsize::new(bytes_done), + } + } +} +impl From for Error +{ + fn from(f: io::Error) -> Self { + Self { + explination: format!("i/o error: "), + location: ErrorLocation::Internal, + bytes_done: None, + inner: Some(f), + } + } +} + +impl fmt::Display for ErrorLocation +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ErrorLocation::Oneesan => write!(f, ""), + ErrorLocation::Imouto => write!(f, ""), + _ => write!(f, ""), + } + } +} +impl fmt::Display for Error +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}+{}: {}{}", self.location, match &self.bytes_done { + Some(value) => human_bytes(value.get()), + None => "0 b".to_owned(), + }, self.explination, match &self.inner { + Some(inner) => format!(" {}", inner), + None => "".to_owned(), + }) + } +} +impl error::Error for Error +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> + { + match &self.inner { + Some(x) => Some(x), + None => None, + } + } +} + +pub(super) trait WeebExt +{ + fn into_imouto(self) -> Error; + fn into_oneesan(self) -> Error; +} +pub(super) trait TryWeebExt +{ + fn try_imouto(self, bytes_done: usize) -> Result; + fn try_oneesan(self, bytes_done: usize) -> Result; +} + +impl WeebExt for io::Error +{ + fn into_imouto(self) -> Error + { + Error { + location: ErrorLocation::Imouto, + ..Error::from(self) + } + } + + fn into_oneesan(self) -> Error + { + Error { + location: ErrorLocation::Oneesan, + ..Error::from(self) + } + } +} + +impl TryWeebExt for io::Result +{ + fn try_imouto(self, bytes_done: usize) -> Result + { + match self { + Ok(v) => Ok(v), + Err(e) => Err(Error { + bytes_done: NonZeroUsize::new(bytes_done), + ..e.into_imouto() + }), + } + } + fn try_oneesan(self, bytes_done: usize) -> Result + { + match self { + Ok(v) => Ok(v), + Err(e) => Err(Error { + bytes_done: NonZeroUsize::new(bytes_done), + ..e.into_oneesan() + }), + } + } +} diff --git a/src/pipe/mod.rs b/src/pipe/mod.rs new file mode 100644 index 0000000..163e177 --- /dev/null +++ b/src/pipe/mod.rs @@ -0,0 +1,48 @@ +use std::{ + io::{self, Read, Write,}, +}; +use crate::{ + multicast, +}; + +mod error; +pub use error::*; + +pub struct LeakyPipe<'a, T, U> +where T: Read, + U: Write +{ + oneesan: &'a mut T, + imouto: multicast::MulticastStream<&'a mut U> +} + +impl<'a, T, U> LeakyPipe<'a, T, U> +where T: Read, + U: Write +{ + pub fn new(oneesan: &'a mut T, imouto: multicast::MulticastStream<&'a mut U>) -> Self + { + Self { + oneesan, + imouto, + } + } + + pub fn pipe(mut self, buf_size: usize, output_to_tty: bool) -> Result<(), Error> + { + let mut buffer = vec![0u8; buf_size]; + let mut bytes_done=0; + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + loop { + let sz = self.oneesan.read(&mut buffer[..]).try_oneesan(bytes_done)?; + if sz == 0 {return Ok(());} + + self.imouto.write(&buffer[0..sz]).try_imouto(bytes_done)?; + if output_to_tty { + stdout.write(&buffer[0..sz])?; + } + bytes_done+=sz; + } + } +}