async working

ffi
Avril 3 years ago
parent 969653b1aa
commit 317a9b47a8
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -9,7 +9,9 @@ license = "MIT"
[features]
default = ["smallvec", "async"]
# Enable async version with tokio v2.0 AsyncRead/AsyncWrite.
async = ["tokio", "pin-project"]
# Explicitly clear in-memory buffers with `explicit_bzero()` instead of normal `bzero()`.
explicit_clear = []
@ -23,3 +25,7 @@ tokio = {version = "0.2", optional = true}
[build-dependencies]
rustc_version = "0.2"
[dev-dependencies]
tempfile = "3.2.0"
tokio = {version = "0.2", features=["full"]}

@ -4,6 +4,7 @@
//extern crate test;
#[cfg(feature="async")]
#[macro_use] extern crate pin_project;
#[macro_use] mod ext; #[allow(unused_imports)] use ext::*;
@ -14,6 +15,7 @@ mod stream;
mod bytes;
#[cfg(feature="async")] mod stream_async;
#[cfg(feature="async")] pub use stream_async::Sink as AsyncSink;
pub use stream::Sink;
pub use key::{

@ -67,6 +67,62 @@ fn transform(crypter: &mut Crypter, buffer: &mut BufferVec, buf: &[u8]) -> Resul
impl<W: AsyncWrite> Sink<W>
{
/// Create a new async Chacha Sink stream wrapper
#[inline] fn new(stream: W, crypter: Crypter) -> Self
{
Self{stream, crypter, buffer: BufferVec::new()}
}
/// Create an encrypting Chacha Sink stream wrapper
pub fn encrypt(stream: W, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::encrypter(key, iv)?))
}
/// Create a decrypting Chacha Sink stream wrapper
pub fn decrypt(stream: W, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::decrypter(key, iv)?))
}
/// Consume into the inner stream
#[inline] pub fn into_inner(self) -> W
{
self.stream
}
/// Consume into the inner stream and crypter
#[inline] pub fn into_parts(self) -> (W, Crypter)
{
(self.stream, self.crypter)
}
/// The crypter of this instance
#[inline] pub fn crypter(&self) -> &Crypter
{
&self.crypter
}
/// The crypter of this instance
#[inline] pub fn crypter_mut(&mut self) -> &mut Crypter
{
&mut self.crypter
}
/// The inner stream
#[inline] pub fn inner(&self) -> &W
{
&self.stream
}
/// The inner stream
#[inline] pub fn inner_mut(&mut self) -> &mut W
{
&mut self.stream
}
/// Clear the internal buffer while keeping it allocated for further use.
///
/// This does not affect operations at all, all it does is 0 out the left-over temporary buffer from the last operation(s).
@ -85,18 +141,20 @@ impl<W: AsyncWrite> Sink<W>
}
}
//When implementing `poll`, we can check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write.
//When implementing `poll`, we check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write.
impl<W: AsyncWrite> AsyncWrite for Sink<W>
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
if this.buffer.is_empty() {
transform(this.crypter, this.buffer, buf)?;
}
}
let poll = this.stream.poll_write(cx, &this.buffer[..]);
if poll.is_ready() {
#[cfg(feature="explicit_clear")]
bytes::explicit_prune(&mut this.buffer[..]);
this.buffer.clear();
}
}
poll
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
@ -122,3 +180,120 @@ impl<W: AsyncWrite> AsyncWrite for Sink<W>
poll
}
}
#[cfg(test)]
mod test
{
use tokio::prelude::*;
#[tokio::test]
async fn sink_sync()
{
let mut output = Vec::new();
let input = "Hello world!";
let (key, iv) = crate::cha::keygen();
let encrypted = {
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
output.clear();
let decrypted = {
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
sink.write_all(&encrypted[..]).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
assert_eq!(&decrypted[..], input.as_bytes());
}
#[tokio::test]
async fn sink_mem()
{
const BACKLOG: usize = 4;
let (mut client, mut server) = tokio::io::duplex(BACKLOG);
let (key, iv) = crate::cha::keygen();
let input = "Hello!";
let enctask = tokio::spawn(async move {
let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
drop(client);
});
let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2);
let dectask = tokio::spawn(async move {
let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt");
tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
});
let (de, en) = tokio::join![dectask, enctask];
de.expect("Dec task panic");
en.expect("Enc task panic");
let mut output = Vec::new();
tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec");
println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input);
assert_eq!(&output[..], input.as_bytes());
}
#[tokio::test]
async fn sink_files()
{
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!";
let (key, iv) = crate::cha::keygen();
{
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut encrypted = output;
encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
{
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut decrypted = output;
let (r1, r2) = tokio::join![encrypted.sync_data(),
decrypted.sync_data()];
r1.expect("enc sync");
r2.expect("dec sync");
let decrypted = {
decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = vec![0u8; input.len()];
decrypted.read_exact(&mut output[..]).await.expect("Read decrypted");
output
};
assert_eq!(&decrypted[..], input.as_bytes());
}
}

Loading…
Cancel
Save