Compare commits

...

13 Commits

Author SHA1 Message Date
Avril d76b77fbf1
Fixed build fail with async feature.
3 years ago
Avril 903bfcd7c4
Added `from_parts()`, as counterpart to `into_parts()`.
3 years ago
Avril 0645e61590
Exposed `cha` module for keygen and `Crypter` creation.
3 years ago
Avril 73b83e4d5c
Whoops...
3 years ago
Avril d2db1f30df
Merge branch 'read-stream-wrapper-async'
3 years ago
Avril 28f36401ec
Merge branch 'read-stream-wrapper'
3 years ago
Avril e656f78a9a
Added `Source` and `AsyncSource`. All tests currently passing.
3 years ago
Avril e7f56fb6ee
Added AsyncSource: Feature `async` version of `Source`. (Does not include the buffer config that the syncronous `Source` uses.)
3 years ago
Avril 868c6d6c61
Added crate-level re-exports.
3 years ago
Avril 9784208fc8
Added crate-level re-exports.
3 years ago
Avril 5d1ac4fa7a
Started AsyncSource.
3 years ago
Avril e536d6b232
Moved async_stream::Sink into seperate files.
3 years ago
Avril 20199e3610 Added basic docs to `Source`.
3 years ago

@ -1,6 +1,6 @@
[package] [package]
name = "chacha20stream" name = "chacha20stream"
version = "1.2.0" version = "2.2.1"
keywords = ["chacha20_poly1305", "stream", "wrapper", "encryption", "decryption"] keywords = ["chacha20_poly1305", "stream", "wrapper", "encryption", "decryption"]
description = "A writable wrapper stream for encryption and decryption with the stream cipher chacha20_poly1305" description = "A writable wrapper stream for encryption and decryption with the stream cipher chacha20_poly1305"
homepage = "https://git.flanchan.moe/flanchan/chacha20stream" homepage = "https://git.flanchan.moe/flanchan/chacha20stream"

@ -57,14 +57,16 @@ mod private
} }
pub mod key; pub mod key;
mod cha; pub mod cha;
mod stream; mod stream;
mod bytes; mod bytes;
#[cfg(feature="async")] mod stream_async; #[cfg(feature="async")] mod stream_async;
#[cfg(feature="async")] pub use stream_async::Sink as AsyncSink; #[cfg(feature="async")] pub use stream_async::Sink as AsyncSink;
#[cfg(feature="async")] pub use stream_async::Source as AsyncSource;
pub use stream::Sink; pub use stream::Sink;
pub use stream::Source;
pub use key::{ pub use key::{
Key, IV, Key, IV,
}; };

@ -27,6 +27,8 @@ pub mod source;
pub use sink::Sink; pub use sink::Sink;
pub use source::Source; pub use source::Source;
//TODO: Stream: Wrapper over both Sink and Source?
#[cfg(test)] #[cfg(test)]
mod tests mod tests
{ {

@ -144,6 +144,15 @@ where W: Write
{ {
(self.stream, self.crypter) (self.stream, self.crypter)
} }
/// Create a sink from a stream and a crypter
///
/// The counterpart to `into_parts()`.
#[inline] pub fn from_parts(stream: W, crypter: Crypter) -> Self
{
Self::new(stream, crypter)
}
} }

@ -242,6 +242,14 @@ where R: Read
{ {
(self.stream, self.crypter) (self.stream, self.crypter)
} }
/// Create a source from a stream and a crypter
///
/// The counterpart to `into_parts()`.
#[inline] pub fn from_parts(stream: R, crypter: Crypter) -> Self
{
Self::new(stream, crypter)
}
} }

@ -0,0 +1,173 @@
use super::*;
use key::*;
use std::io;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
};
use std::fmt;
use openssl::{
symm::Crypter,
error::ErrorStack,
};
/// Size of the in-structure buffer
#[cfg(feature="smallvec")]
pub const BUFFER_SIZE: usize = 32;
#[cfg(feature="smallvec")]
type BufferVec = smallvec::SmallVec<[u8; BUFFER_SIZE]>;
#[cfg(not(feature="smallvec"))]
type BufferVec = Vec<u8>;
pub type Error = ErrorStack;
pub mod sink;
pub use sink::Sink;
pub mod source;
pub use source::Source;
#[cfg(test)]
mod test
{
use tokio::prelude::*;
#[tokio::test]
async fn async_source_enc_dec()
{
use crate::ext::*;
const INPUT: &'static [u8] = b"Hello world!";
let (key, iv) = crate::cha::keygen();
println!("Input: {}", INPUT.hex());
let mut enc = super::Source::encrypt(&INPUT[..], key, iv).expect("Failed to create encryptor");
let mut enc_out = Vec::with_capacity(INPUT.len());
tokio::io::copy(&mut enc, &mut enc_out).await.expect("Failed to copy encrypted output");
println!("(enc) output: {}", enc_out.hex());
let mut dec = super::Source::decrypt(&enc_out[..], key, iv).expect("Failed to create decryptor");
let mut dec_out = Vec::with_capacity(INPUT.len());
tokio::io::copy(&mut dec, &mut dec_out).await.expect("Failed to copy decrypted output");
println!("(dec) output: {}", dec_out.hex());
assert_eq!(&dec_out[..], INPUT);
}
#[tokio::test]
async fn sink_sync()
{
let mut output = Vec::new();
let input = "Hello world!";
let (key, iv) = crate::cha::keygen();
let encrypted = {
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
output.clear();
let decrypted = {
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
sink.write_all(&encrypted[..]).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
assert_eq!(&decrypted[..], input.as_bytes());
}
#[tokio::test]
async fn sink_mem()
{
const BACKLOG: usize = 4;
let (mut client, mut server) = tokio::io::duplex(BACKLOG);
let (key, iv) = crate::cha::keygen();
let input = "Hello!";
let enctask = tokio::spawn(async move {
let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
drop(client);
});
let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2);
let dectask = tokio::spawn(async move {
let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt");
tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
});
let (de, en) = tokio::join![dectask, enctask];
de.expect("Dec task panic");
en.expect("Enc task panic");
let mut output = Vec::new();
tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec");
println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input);
assert_eq!(&output[..], input.as_bytes());
}
#[tokio::test]
async fn sink_files()
{
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!";
let (key, iv) = crate::cha::keygen();
{
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut encrypted = output;
encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
{
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut decrypted = output;
let (r1, r2) = tokio::join![encrypted.sync_data(),
decrypted.sync_data()];
r1.expect("enc sync");
r2.expect("dec sync");
let decrypted = {
decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = vec![0u8; input.len()];
decrypted.read_exact(&mut output[..]).await.expect("Read decrypted");
output
};
assert_eq!(&decrypted[..], input.as_bytes());
}
}

@ -1,29 +1,5 @@
//! Asyncronous `AsyncWrite` wrapper.
use super::*; use super::*;
use key::*;
use std::io;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
};
use std::fmt;
use openssl::{
symm::Crypter,
error::ErrorStack,
};
/// Size of the in-structure buffer
#[cfg(feature="smallvec")]
pub const BUFFER_SIZE: usize = 32;
#[cfg(feature="smallvec")]
type BufferVec = smallvec::SmallVec<[u8; BUFFER_SIZE]>;
#[cfg(not(feature="smallvec"))]
type BufferVec = Vec<u8>;
pub type Error = ErrorStack;
/// Async ChaCha Sink /// Async ChaCha Sink
/// ///
@ -133,6 +109,14 @@ impl<W: AsyncWrite> Sink<W>
(self.stream, self.crypter) (self.stream, self.crypter)
} }
/// Create a sink from a stream and a crypter
///
/// The counterpart to `into_parts()`.
#[inline] pub fn from_parts(stream: W, crypter: Crypter) -> Self
{
Self::new(stream, crypter)
}
/// The crypter of this instance /// The crypter of this instance
#[inline] pub fn crypter(&self) -> &Crypter #[inline] pub fn crypter(&self) -> &Crypter
{ {
@ -214,120 +198,3 @@ impl<W: AsyncWrite> AsyncWrite for Sink<W>
poll poll
} }
} }
#[cfg(test)]
mod test
{
use tokio::prelude::*;
#[tokio::test]
async fn sink_sync()
{
let mut output = Vec::new();
let input = "Hello world!";
let (key, iv) = crate::cha::keygen();
let encrypted = {
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
output.clear();
let decrypted = {
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
sink.write_all(&encrypted[..]).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
sink.into_inner().clone()
};
assert_eq!(&decrypted[..], input.as_bytes());
}
#[tokio::test]
async fn sink_mem()
{
const BACKLOG: usize = 4;
let (mut client, mut server) = tokio::io::duplex(BACKLOG);
let (key, iv) = crate::cha::keygen();
let input = "Hello!";
let enctask = tokio::spawn(async move {
let mut sink = super::Sink::encrypt(&mut client, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
drop(client);
});
let (mut declient, mut deserver) = tokio::io::duplex(BACKLOG * 2);
let dectask = tokio::spawn(async move {
let mut sink = super::Sink::decrypt(&mut declient, key, iv).expect("Sink::encrypt");
tokio::io::copy(&mut server, &mut sink).await.expect("Copy to sink failed");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
});
let (de, en) = tokio::join![dectask, enctask];
de.expect("Dec task panic");
en.expect("Enc task panic");
let mut output = Vec::new();
tokio::io::copy(&mut deserver, &mut output).await.expect("Copy into vec");
println!("In: {}, Out: {}", String::from_utf8_lossy(&output[..]), input);
assert_eq!(&output[..], input.as_bytes());
}
#[tokio::test]
async fn sink_files()
{
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
let input = "Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!Hello world!";
let (key, iv) = crate::cha::keygen();
{
let mut sink = super::Sink::encrypt(&mut output, key, iv).expect("Sink::encrypt");
sink.write_all(input.as_bytes()).await.expect("Sink::write_all");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut encrypted = output;
encrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = tokio::fs::File::from_std(tempfile::tempfile().unwrap());
{
let mut sink = super::Sink::decrypt(&mut output, key, iv).expect("Sink::decrypt");
tokio::io::copy(&mut encrypted, &mut sink).await.expect("Copy to sinl");
sink.flush().await.expect("Sink::flush");
sink.shutdown().await.expect("Sink::shutdown");
}
let mut decrypted = output;
let (r1, r2) = tokio::join![encrypted.sync_data(),
decrypted.sync_data()];
r1.expect("enc sync");
r2.expect("dec sync");
let decrypted = {
decrypted.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
let mut output = vec![0u8; input.len()];
decrypted.read_exact(&mut output[..]).await.expect("Read decrypted");
output
};
assert_eq!(&decrypted[..], input.as_bytes());
}
}

@ -0,0 +1,158 @@
//! Asyncronous `AsyncRead` wrapper.
use super::*;
use tokio::io::AsyncRead;
/// Asyncronous ChaCha source.
/// En/decrypts information from the source async reader.
///
/// This is the `Read` implementing counterpart to `AsyncSink`.
//#[derive(Debug)]
#[pin_project]
pub struct Source<R>
{
#[pin] stream: R,
crypter: Crypter, // for chacha, finalize does nothing it seems. we can also call it multiple times.
buffer: BufferVec, // used to buffer the operation (ad-hoc-buffer wouldn't work for async operations as the buffer may need to be saved over yields.)
}
impl<R: fmt::Debug> fmt::Debug for Source<R>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Source({:?}, ({} buffer cap))", self.stream, self.buffer.capacity())
}
}
/// Perform the cipher transform on the inner buffer, writing to the output buffer, returning the number of bytes updated.
fn transform(crypter: &mut Crypter, buf: &[u8], buffer: &mut [u8]) -> Result<usize, ErrorStack>
{
//if buf.len() > self.buffer.len() {
//buf.resize(buffer.len(), 0);
//}
let n = crypter.update(&buf[..], &mut buffer[..])?;
let _f = crypter.finalize(&mut buffer[..n])?; // I don't know if this is needed.
debug_assert_eq!(_f, 0);
Ok(n)
}
impl<R: AsyncRead> Source<R>
{
/// Create a new async Chacha Source stream wrapper
#[inline] fn new(stream: R, crypter: Crypter) -> Self
{
Self{stream, crypter, buffer: BufferVec::new()}
}
/// Create an encrypting Chacha Source stream wrapper
pub fn encrypt(stream: R, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::encrypter(key, iv)?))
}
/// Create a decrypting Chacha Source stream wrapper
pub fn decrypt(stream: R, key: Key, iv: IV) -> Result<Self, Error>
{
Ok(Self::new(stream, cha::decrypter(key, iv)?))
}
/// Consume into the inner stream
#[inline] pub fn into_inner(self) -> R
{
self.stream
}
/// Consume into the inner stream and crypter
#[inline] pub fn into_parts(self) -> (R, Crypter)
{
(self.stream, self.crypter)
}
/// Create a source from a stream and a crypter
///
/// The counterpart to `into_parts()`.
#[inline] pub fn from_parts(stream: R, crypter: Crypter) -> Self
{
Self::new(stream, crypter)
}
/// The crypter of this instance
#[inline] pub fn crypter(&self) -> &Crypter
{
&self.crypter
}
/// The crypter of this instance
#[inline] pub fn crypter_mut(&mut self) -> &mut Crypter
{
&mut self.crypter
}
/// The inner stream
#[inline] pub fn inner(&self) -> &R
{
&self.stream
}
/// The inner stream
#[inline] pub fn inner_mut(&mut self) -> &mut R
{
&mut self.stream
}
/// Clear the internal buffer while keeping it allocated for further use.
///
/// This does not affect operations at all, all it does is 0 out the left-over temporary buffer from the last operation(s).
#[inline]
pub fn prune(&mut self)
{
#[cfg(feature="explicit_clear")]
{
bytes::explicit_prune(&mut self.buffer[..]);
return;
}
#[cfg(not(feature="explicit_clear"))]
unsafe {
std::ptr::write_bytes(self.buffer.as_mut_ptr(), 0, self.buffer.len());
}
}
}
//When implementing `poll`, we check if buffer is empty on poll, and if it isn't, poll backing stream to write it. Then, clear buffer after `Poll::Ready` on backing stream's write.
impl<R: AsyncRead> AsyncRead for Source<R>
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
if this.buffer.is_empty() {
this.buffer.resize(buf.len(), 0);
}
debug_assert_eq!(buf.len(), this.buffer.len());
let poll = this.stream.poll_read(cx, &mut this.buffer[..]);
match poll {
Poll::Ready(Ok(read)) => {
// Data read, perform transform.
let n = transform(this.crypter, &this.buffer[..read], &mut buf[..read])?;
debug_assert_eq!(n, read);
// Reset buffer size to 0, so we know the next call will be on a new buffer, and we can resize it to the correct size again
if cfg!(feature="explicit_clear") {
bytes::explicit_prune(&mut this.buffer[..]);
} // XXX: Should we blank the buffer here? Or is just a `.clear()` alright?
this.buffer.clear();
Poll::Ready(Ok(n))
},
other => other
}
}
}
Loading…
Cancel
Save