From 969653b1aa6fe750adecacb1d54bd0084ac8ebe4 Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 31 Mar 2021 19:00:53 +0100 Subject: [PATCH] async stream start --- Cargo.toml | 5 +- src/bytes.rs | 8 +++ src/lib.rs | 4 ++ src/stream_async.rs | 124 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 src/stream_async.rs diff --git a/Cargo.toml b/Cargo.toml index 9647fc0..bb8c590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,9 @@ license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["smallvec"] +default = ["smallvec", "async"] +async = ["tokio", "pin-project"] # Explicitly clear in-memory buffers with `explicit_bzero()` instead of normal `bzero()`. explicit_clear = [] @@ -16,7 +17,9 @@ explicit_clear = [] base64 = "0.13" getrandom = "0.2" openssl = "0.10" +pin-project = {version = "1.0.6", optional = true} smallvec = {version = "1.6", features=["union"], optional = true} +tokio = {version = "0.2", optional = true} [build-dependencies] rustc_version = "0.2" diff --git a/src/bytes.rs b/src/bytes.rs index 300175f..b116122 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -27,3 +27,11 @@ pub fn explicit_prune(buffer: &mut[u8]) { explicit_bzero(buffer.as_mut_ptr() as *mut c_void, buffer.len()); } } + +pub fn prune(buffer: &mut [u8]) +{ + #[cfg(feature="explicit_clear")] explicit_prune(buffer); + #[cfg(not(feature="explicit_clear"))] unsafe { + std::ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 4f38050..d3acfee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ //extern crate test; +#[macro_use] extern crate pin_project; + #[macro_use] mod ext; #[allow(unused_imports)] use ext::*; pub mod key; @@ -11,6 +13,8 @@ mod cha; mod stream; mod bytes; +#[cfg(feature="async")] mod stream_async; + pub use stream::Sink; pub use key::{ Key, IV, diff --git a/src/stream_async.rs b/src/stream_async.rs new file mode 100644 index 0000000..6564412 --- /dev/null +++ b/src/stream_async.rs @@ -0,0 +1,124 @@ +use super::*; +use key::*; + +use std::io; +use tokio::io::AsyncWrite; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use std::fmt; +use openssl::{ + symm::Crypter, + error::ErrorStack, +}; + +#[cfg(feature="smallvec")] +pub const BUFFER_SIZE: usize = 32; + +#[cfg(feature="smallvec")] +type BufferVec = smallvec::SmallVec<[u8; BUFFER_SIZE]>; +#[cfg(not(feature="smallvec"))] +type BufferVec = Vec; + +pub type Error = ErrorStack; + +/// Async ChaCha Sink +/// +/// # Note +/// When writing, a temporary buffer stored in the structure is used. This buffer is **not** cleared after a write, for efficiency reasons. This may leave sensitive information in the buffer after the write operation. +/// The `flush()` implementation *does* clear this buffer. +/// You can use the `prune()` function to zero out this buffer manually too. +//#[derive(Debug)] +#[pin_project] +pub struct Sink +{ + #[pin] stream: W, + + crypter: Crypter, // for chacha, finalize does nothing it seems. we can also call it multiple times. + + buffer: BufferVec, // used to buffer the operation +} + +impl fmt::Debug for Sink +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + write!(f, "Sink({:?}, ({} buffer cap))", self.stream, self.buffer.capacity()) + } +} + +/// Perform the cipher transform on this input to the inner buffer, returning the number of bytes updated. +fn transform(crypter: &mut Crypter, buffer: &mut BufferVec, buf: &[u8]) -> Result<(), ErrorStack> +{ + //if buf.len() > self.buffer.len() { + buffer.resize(buf.len(), 0); + //} + + let n = crypter.update(&buf[..], &mut buffer[..])?; + let _f = crypter.finalize(&mut buffer[..n])?; // I don't know if this is needed. + debug_assert_eq!(_f, 0); + + buffer.resize(n, 0); + Ok(()) +} + + +impl Sink +{ + /// Clear the internal buffer while keeping it allocated for further use. + /// + /// This does not affect operations at all, all it does is 0 out the left-over temporary buffer from the last operation(s). + #[inline] + pub fn prune(&mut self) + { + #[cfg(feature="explicit_clear")] + { + bytes::explicit_prune(&mut self.buffer[..]); + return; + } + #[cfg(not(feature="explicit_clear"))] + unsafe { + std::ptr::write_bytes(self.buffer.as_mut_ptr(), 0, self.buffer.len()); + } + } +} + +//When implementing `poll`, we can check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write. +impl AsyncWrite for Sink +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + if this.buffer.is_empty() { + transform(this.crypter, this.buffer, buf)?; + } + let poll = this.stream.poll_write(cx, &this.buffer[..]); + if poll.is_ready() { + this.buffer.clear(); + } + poll + } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + let poll = this.stream.poll_flush(cx); + if poll.is_ready() { + #[cfg(feature="explicit_clear")] + bytes::explicit_prune(&mut this.buffer[..]); + this.buffer.clear(); + } + poll + } + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + let poll = this.stream.poll_shutdown(cx); + if poll.is_ready() { + #[cfg(feature="explicit_clear")] + bytes::explicit_prune(&mut this.buffer[..]); + this.buffer.clear(); + } + poll + } +}