Merge branch 'read-stream-wrapper-async'

master
Avril 3 years ago
commit d2db1f30df
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -63,6 +63,7 @@ mod bytes;
#[cfg(feature="async")] mod stream_async;
#[cfg(feature="async")] pub use stream_async::Sink as AsyncSink;
#[cfg(feature="async")] pub use stream_async::Source as AsyncSource;
pub use stream::Sink;
pub use stream::Source;

@ -0,0 +1,173 @@
use super::*;
use key::*;
use std::io;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
};
use std::fmt;
use openssl::{
symm::Crypter,
error::ErrorStack,
};
/// Size of the in-structure buffer
#[cfg(feature="smallvec")]
pub const BUFFER_SIZE: usize = 32;
#[cfg(feature="smallvec")]
type BufferVec = smallvec::SmallVec<[u8; BUFFER_SIZE]>;
#[cfg(not(feature="smallvec"))]
type BufferVec = Vec<u8>;
pub type Error = ErrorStack;
pub mod sink;
pub use sink::Sink;
pub mod source;
pub use source::Source;
#[cfg(test)]
mod test
{
use tokio::prelude::*;
#[tokio::test]
async fn async_source_enc_dec()
{
use crate::ext::*;
const INPUT: &'static [u8] = b"Hello world!";
let (key, iv) = crate::cha::keygen();
println!("Input: {}", INPUT.hex());
let mut enc = super::Source::encrypt(&INPUT[..], key, iv).expect("Failed to create encryptor");
let mut enc_out = Vec::with_capacity(INPUT.len());
tokio::io::copy(&mut enc, &mut enc_out).await.expect("Failed to copy encrypted output");
println!("(enc) output: {}", enc_out.hex());
let mut dec = super::Source::decrypt(&enc_out[..], key, iv).expect("Failed to create decryptor");
let mut dec_out = Vec::with_capacity(INPUT.len());
tokio::io::copy(&mut dec, &mut dec_out).await.expect("Failed to copy decrypted output");
println!("(dec) output: {}", dec_out.hex());
assert_eq!(&dec_out[..], INPUT);
}
#[tokio::test]
async fn sink_sync()
{
let mut output = Vec::new();
let input = "Hello world!";
let (key, iv) = crate::cha::keygen();
let encrypted = {
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
output.clear();
let decrypted = {
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
sink.write_all(&encrypted[..]).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
assert_eq!(&decrypted[..], input.as_bytes());
}
#[tokio::test]
async fn sink_mem()
{
const BACKLOG: usize = 4;
let (mut client, mut server) = tokio::io::duplex(BACKLOG);
let (key, iv) = crate::cha::keygen();
let input = "Hello!";
let enctask = tokio::spawn(async move {
let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
drop(client);
});
let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2);
let dectask = tokio::spawn(async move {
let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt");
tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
});
let (de, en) = tokio::join![dectask, enctask];
de.expect("Dec task panic");
en.expect("Enc task panic");
let mut output = Vec::new();
tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec");
println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input);
assert_eq!(&output[..], input.as_bytes());
}
#[tokio::test]
async fn sink_files()
{
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!";
let (key, iv) = crate::cha::keygen();
{
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut encrypted = output;
encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
{
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut decrypted = output;
let (r1, r2) = tokio::join![encrypted.sync_data(),
decrypted.sync_data()];
r1.expect("enc sync");
r2.expect("dec sync");
let decrypted = {
decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = vec![0u8; input.len()];
decrypted.read_exact(&mut output[..]).await.expect("Read decrypted");
output
};
assert_eq!(&decrypted[..], input.as_bytes());
}
}

@ -1,29 +1,5 @@
//! Asyncronous `AsyncWrite` wrapper.
use super::*;
use key::*;
use std::io;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
};
use std::fmt;
use openssl::{
symm::Crypter,
error::ErrorStack,
};
/// Size of the in-structure buffer
#[cfg(feature="smallvec")]
pub const BUFFER_SIZE: usize = 32;
#[cfg(feature="smallvec")]
type BufferVec = smallvec::SmallVec<[u8; BUFFER_SIZE]>;
#[cfg(not(feature="smallvec"))]
type BufferVec = Vec<u8>;
pub type Error = ErrorStack;
/// Async ChaCha Sink
///
@ -214,120 +190,3 @@ impl<W: AsyncWrite> AsyncWrite for Sink<W>
poll
}
}
#[cfg(test)]
mod test
{
use tokio::prelude::*;
#[tokio::test]
async fn sink_sync()
{
let mut output = Vec::new();
let input = "Hello world!";
let (key, iv) = crate::cha::keygen();
let encrypted = {
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
output.clear();
let decrypted = {
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
sink.write_all(&encrypted[..]).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
assert_eq!(&decrypted[..], input.as_bytes());
}
#[tokio::test]
async fn sink_mem()
{
const BACKLOG: usize = 4;
let (mut client, mut server) = tokio::io::duplex(BACKLOG);
let (key, iv) = crate::cha::keygen();
let input = "Hello!";
let enctask = tokio::spawn(async move {
let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
drop(client);
});
let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2);
let dectask = tokio::spawn(async move {
let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt");
tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
});
let (de, en) = tokio::join![dectask, enctask];
de.expect("Dec task panic");
en.expect("Enc task panic");
let mut output = Vec::new();
tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec");
println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input);
assert_eq!(&output[..], input.as_bytes());
}
#[tokio::test]
async fn sink_files()
{
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!";
let (key, iv) = crate::cha::keygen();
{
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut encrypted = output;
encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
{
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut decrypted = output;
let (r1, r2) = tokio::join![encrypted.sync_data(),
decrypted.sync_data()];
r1.expect("enc sync");
r2.expect("dec sync");
let decrypted = {
decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = vec![0u8; input.len()];
decrypted.read_exact(&mut output[..]).await.expect("Read decrypted");
output
};
assert_eq!(&decrypted[..], input.as_bytes());
}
}

@ -0,0 +1,149 @@
//! Asyncronous `AsyncRead` wrapper.
use super::*;
use tokio::io::AsyncRead;
/// Asyncronous ChaCha source.
/// En/decrypts information from the source async reader.
///
/// This is the `Read` implementing counterpart to `AsyncSink`.
//#[derive(Debug)]
#[pin_project]
pub struct Source<R>
{
#[pin] stream: R,
crypter: Crypter, // for chacha, finalize does nothing it seems. we can also call it multiple times.
buffer: BufferVec, // used to buffer the operation (ad-hoc-buffer wouldn't work for async operations as the buffer may need to be saved over yields.)
}
impl<R: fmt::Debug> fmt::Debug for Source<R>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Source({:?}, ({} buffer cap))", self.stream, self.buffer.capacity())
}
}
/// Perform the cipher transform on the inner buffer, writing to the output buffer, returning the number of bytes updated.
fn transform(crypter: &mut Crypter, buf: &[u8], buffer: &mut [u8]) -> Result<usize, ErrorStack>
{
//if buf.len() > self.buffer.len() {
//buf.resize(buffer.len(), 0);
//}
let n = crypter.update(&buf[..], &mut buffer[..])?;
let _f = crypter.finalize(&mut buffer[..n])?; // I don't know if this is needed.
debug_assert_eq!(_f, 0);
Ok(n)
}
impl<R: AsyncRead> Source<R>
{
/// Create a new async Chacha Source stream wrapper
#[inline] fn new(stream: R, crypter: Crypter) -> Self
{
Self{stream, crypter, buffer: BufferVec::new()}
}
/// Create an encrypting Chacha Source stream wrapper
pub fn encrypt(stream: R, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::encrypter(key, iv)?))
}
/// Create a decrypting Chacha Source stream wrapper
pub fn decrypt(stream: R, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::decrypter(key, iv)?))
}
/// Consume into the inner stream
#[inline] pub fn into_inner(self) -> R
{
self.stream
}
/// Consume into the inner stream and crypter
#[inline] pub fn into_parts(self) -> (R, Crypter)
{
(self.stream, self.crypter)
}
/// The crypter of this instance
#[inline] pub fn crypter(&self) -> &Crypter
{
&self.crypter
}
/// The crypter of this instance
#[inline] pub fn crypter_mut(&mut self) -> &mut Crypter
{
&mut self.crypter
}
/// The inner stream
#[inline] pub fn inner(&self) -> &R
{
&self.stream
}
/// The inner stream
#[inline] pub fn inner_mut(&mut self) -> &mut R
{
&mut self.stream
}
/// Clear the internal buffer while keeping it allocated for further use.
///
/// This does not affect operations at all, all it does is 0 out the left-over temporary buffer from the last operation(s).
#[inline]
pub fn prune(&mut self)
{
#[cfg(feature="explicit_clear")]
{
bytes::explicit_prune(&mut self.buffer[..]);
return;
}
#[cfg(not(feature="explicit_clear"))]
unsafe {
std::ptr::write_bytes(self.buffer.as_mut_ptr(), 0, self.buffer.len());
}
}
}
//When implementing `poll`, we check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write.
impl<R: AsyncRead> AsyncRead for Source<R>
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
if this.buffer.is_empty() {
this.buffer.resize(buf.len(), 0);
}
debug_assert_eq!(buf.len(), this.buffer.len());
let poll = this.stream.poll_read(cx, &mut this.buffer[..]);
match poll {
Poll::Ready(Ok(read)) => {
// Data read, perform transform.
let n = transform(this.crypter, &this.buffer[..read], &mut buf[..read])?;
debug_assert_eq!(n, read);
// Reset buffer size to 0, so we know the next call will be on a new buffer, and we can resize it to the correct size again
if cfg!(feature="explicit_clear") {
bytes::explicit_prune(&mut this.buffer[..]);
} // XXX: Should we blank the buffer here? Or is just a `.clear()` alright?
this.buffer.clear();
Poll::Ready(Ok(n))
},
other => other
}
}
}
Loading…
Cancel
Save