diff --git a/Cargo.toml b/Cargo.toml index bb8c590..c091c78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,9 @@ license = "MIT" [features] default = ["smallvec", "async"] +# Enable async version with tokio v2.0 AsyncRead/AsyncWrite. async = ["tokio", "pin-project"] + # Explicitly clear in-memory buffers with `explicit_bzero()` instead of normal `bzero()`. explicit_clear = [] @@ -23,3 +25,7 @@ tokio = {version = "0.2", optional = true} [build-dependencies] rustc_version = "0.2" + +[dev-dependencies] +tempfile = "3.2.0" +tokio = {version = "0.2", features=["full"]} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index d3acfee..da3fbc1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ //extern crate test; +#[cfg(feature="async")] #[macro_use] extern crate pin_project; #[macro_use] mod ext; #[allow(unused_imports)] use ext::*; @@ -14,6 +15,7 @@ mod stream; mod bytes; #[cfg(feature="async")] mod stream_async; +#[cfg(feature="async")] pub use stream_async::Sink as AsyncSink; pub use stream::Sink; pub use key::{ diff --git a/src/stream_async.rs b/src/stream_async.rs index 6564412..4b5bfbc 100644 --- a/src/stream_async.rs +++ b/src/stream_async.rs @@ -67,6 +67,62 @@ fn transform(crypter: &mut Crypter, buffer: &mut BufferVec, buf: &[u8]) -> Resul impl Sink { + + /// Create a new async Chacha Sink stream wrapper + #[inline] fn new(stream: W, crypter: Crypter) -> Self + { + Self{stream, crypter, buffer: BufferVec::new()} + } + + /// Create an encrypting Chacha Sink stream wrapper + pub fn encrypt(stream: W, key: Key, iv: IV) -> Result + { + Ok(Self::new(stream, cha::encrypter(key, iv)?)) + } + + /// Create a decrypting Chacha Sink stream wrapper + pub fn decrypt(stream: W, key: Key, iv: IV) -> Result + { + Ok(Self::new(stream, cha::decrypter(key, iv)?)) + } + + + /// Consume into the inner stream + #[inline] pub fn into_inner(self) -> W + { + self.stream + } + + /// Consume into the inner stream and crypter + #[inline] pub fn into_parts(self) -> (W, Crypter) + { + (self.stream, self.crypter) + } + + /// The crypter of this instance + #[inline] pub fn crypter(&self) -> &Crypter + { + &self.crypter + } + + /// The crypter of this instance + #[inline] pub fn crypter_mut(&mut self) -> &mut Crypter + { + &mut self.crypter + } + + /// The inner stream + #[inline] pub fn inner(&self) -> &W + { + &self.stream + } + + /// The inner stream + #[inline] pub fn inner_mut(&mut self) -> &mut W + { + &mut self.stream + } + /// Clear the internal buffer while keeping it allocated for further use. /// /// This does not affect operations at all, all it does is 0 out the left-over temporary buffer from the last operation(s). @@ -85,18 +141,20 @@ impl Sink } } -//When implementing `poll`, we can check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write. +//When implementing `poll`, we check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write. impl AsyncWrite for Sink { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { let this = self.project(); if this.buffer.is_empty() { transform(this.crypter, this.buffer, buf)?; - } + } let poll = this.stream.poll_write(cx, &this.buffer[..]); if poll.is_ready() { + #[cfg(feature="explicit_clear")] + bytes::explicit_prune(&mut this.buffer[..]); this.buffer.clear(); - } + } poll } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -122,3 +180,120 @@ impl AsyncWrite for Sink poll } } + +#[cfg(test)] +mod test +{ + use tokio::prelude::*; + #[tokio::test] + async fn sink_sync() + { + let mut output = Vec::new(); + let input = "Hello world!"; + let (key, iv) = crate::cha::keygen(); + + let encrypted = { + let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt"); + sink.write_all(input.as_bytes()).await.expect("Sink::write_all"); + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + + sink.into_inner().clone() + }; + + output.clear(); + let decrypted = { + let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt"); + sink.write_all(&encrypted[..]).await.expect("Sink::write_all"); + + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + + sink.into_inner().clone() + }; + assert_eq!(&decrypted[..], input.as_bytes()); + } + #[tokio::test] + async fn sink_mem() + { + const BACKLOG: usize = 4; + let (mut client, mut server) = tokio::io::duplex(BACKLOG); + let (key, iv) = crate::cha::keygen(); + + let input = "Hello!"; + + let enctask = tokio::spawn(async move { + let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt"); + sink.write_all(input.as_bytes()).await.expect("Sink::write_all"); + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + + drop(client); + }); + + let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2); + let dectask = tokio::spawn(async move { + + let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt"); + tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed"); + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + }); + + let (de, en) = tokio::join![dectask, enctask]; + + de.expect("Dec task panic"); + en.expect("Enc task panic"); + + let mut output = Vec::new(); + tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec"); + + println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input); + assert_eq!(&output[..], input.as_bytes()); + } + #[tokio::test] + async fn sink_files() + { + let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap()); + + let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!"; + let (key, iv) = crate::cha::keygen(); + + { + let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt"); + sink.write_all(input.as_bytes()).await.expect("Sink::write_all"); + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + } + + let mut encrypted = output; + encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap(); + + let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap()); + { + let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt"); + tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl"); + + sink.flush().await.expect("Sink::flush"); + sink.shutdown().await.expect("Sink::shutdown"); + } + let mut decrypted = output; + + let (r1, r2) = tokio::join![encrypted.sync_data(), + decrypted.sync_data()]; + r1.expect("enc sync"); + r2.expect("dec sync"); + + let decrypted = { + decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap(); + + let mut output = vec![0u8; input.len()]; + decrypted.read_exact(&mut output[..]).await.expect("Read decrypted"); + + output + }; + + assert_eq!(&decrypted[..], input.as_bytes()); + } +} +