//! Socket encryption wrapper use super::*; use cryptohelpers::{ rsa::{ self, RsaPublicKey, RsaPrivateKey, openssl::{ symm::Crypter, error::ErrorStack, }, }, sha256, }; use chacha20stream::{ AsyncSink, AsyncSource, Key, IV, cha, }; use std::sync::Arc; use tokio::{ sync::{ RwLock, RwLockReadGuard, RwLockWriteGuard, }, }; use std::{ io, fmt, task::{ Context, Poll, }, pin::Pin, marker::Unpin, }; use smallvec::SmallVec; /// Size of a single RSA ciphertext. pub const RSA_CIPHERTEXT_SIZE: usize = 512; /// A single, full block of RSA ciphertext. type RsaCiphertextBlock = [u8; RSA_CIPHERTEXT_SIZE]; /// Max size to read when exchanging keys const TRANS_KEY_MAX_SIZE: usize = 4096; /// Encrypted socket information. #[derive(Debug)] struct ESockInfo { us: RsaPrivateKey, them: Option, } impl ESockInfo { /// Generate a new private key pub fn new(us: impl Into) -> Self { Self { us: us.into(), them: None, } } /// Generate a new private key for the local endpoint pub fn generate() -> Result { Ok(Self::new(RsaPrivateKey::generate()?)) } } /// The encryption state of the Tx and Rx instances. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)] struct ESockState { encr: bool, encw: bool, } impl Default for ESockState { #[inline] fn default() -> Self { Self { encr: false, encw: false, } } } /// Contains a cc20 Key and IV that can be serialized and then encrypted #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct ESockSessionKey { key: Key, iv: IV, } impl fmt::Display for ESockSessionKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Key: {}, IV: {}", self.key.hex(), self.iv.hex()) } } impl ESockSessionKey { /// Generate a new cc20 key + iv, pub fn generate() -> Self { let (key,iv) = cha::keygen(); Self{key,iv} } /// Generate an encryption device pub fn to_decrypter(&self) -> Result { cha::decrypter(&self.key, &self.iv) } /// Generate an encryption device pub fn to_encrypter(&self) -> Result { cha::encrypter(&self.key, &self.iv) } /// Encrypt with RSA pub fn to_ciphertext(&self, rsa_key: &K) -> eyre::Result { let mut output = [0u8; RSA_CIPHERTEXT_SIZE]; let mut temp = SmallVec::<[u8; RSA_CIPHERTEXT_SIZE]>::new(); // We know size will fit into here. serde_cbor::to_writer(&mut temp, self) .wrap_err(eyre!("Failed to CBOR encode session key to buffer")) .with_section(|| self.clone().header("Session key was"))?; debug_assert!(temp.len() < RSA_CIPHERTEXT_SIZE); let _wr = rsa::encrypt_slice_sync(&temp, rsa_key, &mut &mut output[..]) .wrap_err(eyre!("Failed to encrypt session key with RSA public key")) .with_section(|| self.clone().header("Session key was")) .with_section({let temp = temp.len(); move || temp.header("Encoded data size was")}) .with_section(move || base64::encode(temp).header("Encoded data (base64) was"))?; debug_assert_eq!(_wr, output.len()); Ok(output) } /// Decrypt from RSA pub fn from_ciphertext(data: &[u8; RSA_CIPHERTEXT_SIZE], rsa_key: &K) -> eyre::Result where ::KeyType: rsa::openssl::pkey::HasPrivate //ugh, why do we have to have this bound??? it should be implied ffs... :/ { let mut temp = SmallVec::<[u8; RSA_CIPHERTEXT_SIZE]>::new(); rsa::decrypt_slice_sync(data, rsa_key, &mut temp) .wrap_err(eyre!("Failed to decrypt ciphertext to session key")) .with_section({let data = data.len(); move || data.header("Ciphertext length was")}) .with_section(|| base64::encode(data).header("Ciphertext was"))?; Ok(serde_cbor::from_slice(&temp[..]) .wrap_err(eyre!("Failed to decode CBOR data to session key object")) .with_section({let temp = temp.len(); move || temp.header("Encoded data size was")}) .with_section(move || base64::encode(temp).header("Encoded data (base64) was"))?) } } /// A tx+rx socket. #[pin_project] #[derive(Debug)] pub struct ESock { info: ESockInfo, state: ESockState, #[pin] rx: AsyncSource, #[pin] tx: AsyncSink, } impl ESock { fn inner(&self) -> (&W, &R) { (self.tx.inner(), self.rx.inner()) } fn inner_mut(&mut self) -> (&mut W, &mut R) { (self.tx.inner_mut(), self.rx.inner_mut()) } ///Get a mutable ref to unencrypted read+write fn unencrypted(&mut self) -> (&mut W, &mut R) { (self.tx.inner_mut(), self.rx.inner_mut()) } /// Get a mutable ref to encrypted write+read fn encrypted(&mut self) -> (&mut AsyncSink, &mut AsyncSource) { (&mut self.tx, &mut self.rx) } /// Have the RSA keys been exchanged? pub fn has_exchanged(&self) -> bool { self.info.them.is_some() } /// Is the Write + Read operation encrypted? Tuple is `(Tx, Rx)`. #[inline] pub fn is_encrypted(&self) -> (bool, bool) { (self.state.encw, self.state.encr) } /// Create a new `ESock` wrapper over this writer and reader with this specific RSA key. pub fn with_key(key: impl Into, tx: W, rx: R) -> Self { let (tk, tiv) = cha::keygen(); Self { info: ESockInfo::new(key), state: Default::default(), // Note: These key+IV pairs are never used, as `state` defaults to unencrypted, and a new key/iv pair is generated when we `set_encrypted_write/read(true)`. // TODO: Have a method to exchange these default session keys after `exchange()`? tx: AsyncSink::encrypt(tx, tk, tiv).expect("Failed to create temp AsyncSink"), rx: AsyncSource::encrypt(rx, tk, tiv).expect("Failed to create temp AsyncSource"), } } /// Create a new `ESock` wrapper over this writer and reader with a newly generated private key #[inline] pub fn new(tx: W, rx: R) -> Result { Ok(Self::with_key(RsaPrivateKey::generate()?, tx, rx)) } /// The local RSA private key #[inline] pub fn local_key(&self) -> &RsaPrivateKey { &self.info.us } /// THe remote RSA public key (if exchange has happened.) #[inline] pub fn foreign_key(&self) -> Option<&RsaPublicKey> { self.info.them.as_ref() } /// Split this `ESock` into a read+write pair. /// /// # Note /// You must preform an `exchange()` before splitting, as exchanging RSA keys is not possible on a single half. /// /// It is also more efficient to `set_encrypted_write/read(true)` on `ESock` than it is on the halves, but changinc encryption modes on halves is still possible. pub fn split(self) -> (ESockWriteHalf, ESockReadHalf) { let arced = Arc::new(self.info); (ESockWriteHalf(Arc::clone(&arced), self.tx, self.state.encw), ESockReadHalf(arced, self.rx, self.state.encr)) } /// Merge a previously split `ESock` into a single one again. /// /// # Panics /// If the two halves were not split from the same `ESock`. pub fn unsplit(txh: ESockWriteHalf, rxh: ESockReadHalf) -> Self { #[cold] #[inline(never)] fn _panic_ptr_ineq() -> ! { panic!("Cannot merge halves split from different sources") } if !Arc::ptr_eq(&txh.0, &rxh.0) { _panic_ptr_ineq(); } let tx = txh.1; drop(txh.0); let info = Arc::try_unwrap(rxh.0).unwrap(); let rx = rxh.1; Self { state: ESockState { encw: txh.2, encr: rxh.2, }, info, tx, rx } } } async fn set_encrypted_write_for(info: &ESockInfo, tx: &mut AsyncSink) -> eyre::Result<()> { use tokio::prelude::*; let session_key = ESockSessionKey::generate(); let data = { let them = info.them.as_ref().expect("Cannot set encrypted write when keys have not been exchanged"); session_key.to_ciphertext(them) .wrap_err(eyre!("Failed to encrypt session key with foreign endpoint's key")) .with_section(|| session_key.to_string().header("Session key was")) .with_section(|| them.to_string().header("Foreign pubkey was"))? }; let crypter = session_key.to_encrypter() .wrap_err(eyre!("Failed to create encryption device from session key for Tx")) .with_section(|| session_key.to_string().header("Session key was"))?; // Send rsa `data` over unencrypted endpoint tx.inner_mut().write_all(&data[..]).await .wrap_err(eyre!("Failed to write ciphertext to endpoint")) .with_section(|| data.to_base64_string().header("Ciphertext of session key was"))?; // Set crypter of `tx` to `session_key`. *tx.crypter_mut() = crypter; Ok(()) } async fn set_encrypted_read_for(info: &ESockInfo, rx: &mut AsyncSource) -> eyre::Result<()> { use tokio::prelude::*; let mut data = [0u8; RSA_CIPHERTEXT_SIZE]; // Read `data` from unencrypted endpoint rx.inner_mut().read_exact(&mut data[..]).await .wrap_err(eyre!("Failed to read ciphertext from endpoint"))?; // Decrypt `data` let session_key = ESockSessionKey::from_ciphertext(&data, &info.us) .wrap_err(eyre!("Failed to decrypt session key from ciphertext")) .with_section(|| data.to_base64_string().header("Ciphertext was")) .with_section(|| info.us.to_string().header("Our RSA key is"))?; // Set crypter of `rx` to `session_key`. *rx.crypter_mut() = session_key.to_decrypter() .wrap_err(eyre!("Failed to create decryption device from session key for Rx")) .with_section(|| session_key.to_string().header("Decrypted session key was"))?; Ok(()) } impl ESock { /// Get the Tx and Rx of the stream. /// /// # Returns /// Returns encrypted stream halfs if the stream is encrypted, unencrypted if not. pub fn stream(&mut self) -> (&mut (dyn AsyncWrite + Unpin + '_), &mut (dyn AsyncRead + Unpin + '_)) { (if self.state.encw { &mut self.tx } else { self.tx.inner_mut() }, if self.state.encr { &mut self.rx } else { self.rx.inner_mut() }) } /// Enable write encryption pub async fn set_encrypted_write(&mut self, set: bool) -> eyre::Result<()> { if set { set_encrypted_write_for(&self.info, &mut self.tx).await?; // Set `encw` to true self.state.encw = true; Ok(()) } else { self.state.encw = false; Ok(()) } } /// Enable read encryption /// /// The other endpoint must have sent a `set_encrypted_write()` pub async fn set_encrypted_read(&mut self, set: bool) -> eyre::Result<()> { if set { set_encrypted_read_for(&self.info, &mut self.rx).await?; // Set `encr` to true self.state.encr = true; Ok(()) } else { self.state.encr = false; Ok(()) } } /// Get dynamic ref to unencrypted write+read fn unencrypted_dyn(&mut self) -> (&mut (dyn AsyncWrite + Unpin + '_), &mut (dyn AsyncRead + Unpin + '_)) { (self.tx.inner_mut(), self.rx.inner_mut()) } /// Get dynamic ref to encrypted write+read fn encrypted_dyn(&mut self) -> (&mut (dyn AsyncWrite + Unpin + '_), &mut (dyn AsyncRead + Unpin + '_)) { (&mut self.tx, &mut self.rx) } /// Exchange keys. pub async fn exchange(&mut self) -> eyre::Result<()> { use tokio::prelude::*; let our_key = self.info.us.get_public_parts(); let (tx, rx) = self.inner_mut(); let read_fut = { async move { // Read the public key from `rx`. //TODO: Find pubkey max size. let mut sz_buf = [0u8; std::mem::size_of::()]; rx.read_exact(&mut sz_buf[..]).await .wrap_err(eyre!("Failed to read size of pubkey form endpoint"))?; let sz64 = u64::from_be_bytes(sz_buf); let sz= match usize::try_from(sz64) .wrap_err(eyre!("Read size could not fit into u64")) .with_section(|| format!("{:?}", sz_buf).header("Read buffer was")) .with_section(|| u64::from_be_bytes(sz_buf).header("64=bit size value was")) .with_warning(|| "This should not happen, it is only possible when you are running a machine with a pointer size lower than 64 bits.") .with_suggestion(|| "The message is likely malformed. If it is not, then you are communicating with an endpoint of 64 bits whereas your pointer size is far less.")? { x if x > TRANS_KEY_MAX_SIZE => return Err(eyre!("Recv'd key size exceeded max acceptable key buffer size")), x => x }; let mut key_bytes = Vec::with_capacity(sz); tokio::io::copy(&mut rx.take(sz64), &mut key_bytes).await .wrap_err("Failed to read key bytes into buffer") .with_section(move || sz64.header("Pubkey size to read was"))?; if key_bytes.len() != sz { return Err(eyre!("Could not read required bytes")); } let k = RsaPublicKey::from_bytes(&key_bytes) .wrap_err("Failed to construct RSA public key from read bytes") .with_section(|| sz.header("Pubkey size was")) .with_section(move || key_bytes.to_base64_string().header("Pubkey bytes were"))?; Result::::Ok(k) } }; let write_fut = { let key_bytes = our_key.to_bytes(); assert!(key_bytes.len() <= TRANS_KEY_MAX_SIZE); let sz64 = u64::try_from(key_bytes.len()) .wrap_err(eyre!("Size of our pubkey could not fit into u64")) .with_section(|| key_bytes.len().header("Size was")) .with_warning(|| "This should not happen, it is only possible when you are running a machine with a pointer size larger than 64 bits.") .with_warning(|| "There was likely internal memory corruption.")?; let sz_buf = sz64.to_be_bytes(); async move { tx.write_all(&sz_buf[..]).await .wrap_err(eyre!("Failed to write key size")) .with_section(|| sz64.header("Key size bytes were")) .with_section(|| format!("{:?}", sz_buf).header("Key size bytes (BE) were"))?; tx.write_all(&key_bytes[..]).await .wrap_err(eyre!("Failed to write key bytes")) .with_section(|| sz64.header("Size of key was")) .with_section(|| key_bytes.to_base64_string().header("Key bytes are"))?; Result::<(), eyre::Report>::Ok(()) } }; let (send, recv) = tokio::join! [write_fut, read_fut]; send.wrap_err("Failed to send our pubkey")?; let recv = recv.wrap_err("Failed to receive foreign pubkey")?; self.info.them = Some(recv); Ok(()) } } //XXX: For some reason, non-exact reads + writes cause garbage to be produced on the receiving end? // Is this fixable? Why does it disjoint? I have no idea... This is supposed to be a stream cipher, right? Why does positioning matter? Have I misunderstood how it workd? Eh... // With this bug, it seems the `while read(buffer) > 0` construct is impossible. This might make this entirely useless. Hopefully with the rigid size-based format for `Message` we won't run into this problem, but subsequent data streaming will likely be affected unless we use rigid, fixed, and (inefficiently) communicated buffer sizes. impl AsyncWrite for ESock where W: AsyncWrite { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { //XXX: If the encryption state of the socket is changed between polls, this breaks. Idk if we can do anything about that tho. if self.state.encw { self.project().tx.poll_write(cx, buf) } else { // SAFETY: Uhh... well I think this is fine? Because we can project the container. // TODO: Can we project the `tx`? Or maybe add a method in `AsyncSink` to map a pinned sink to a `Pin<&mut W>`? unsafe { self.map_unchecked_mut(|this| this.tx.inner_mut()).poll_write(cx, buf)} } } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Should we do anything else here? // Should we clear foreign key/current session key? self.project().tx.poll_shutdown(cx) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().tx.poll_flush(cx) } } impl AsyncRead for ESock where R: AsyncRead { fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { //XXX: If the encryption state of the socket is changed between polls, this breaks. Idk if we can do anything about that tho. if self.state.encr { self.project().rx.poll_read(cx, buf) } else { // SAFETY: Uhh... well I think this is fine? Because we can project the container. // TODO: Can we project the `tx`? Or maybe add a method in `AsyncSink` to map a pinned sink to a `Pin<&mut W>`? unsafe { self.map_unchecked_mut(|this| this.rx.inner_mut()).poll_read(cx, buf)} } } } /// Write half for `ESock`. #[pin_project] #[derive(Debug)] pub struct ESockWriteHalf(Arc, #[pin] AsyncSink, bool); /// Read half for `ESock`. #[pin_project] #[derive(Debug)] pub struct ESockReadHalf(Arc, #[pin] AsyncSource, bool); //Impl AsyncRead/Write + set_encrypted_read/write for ESockRead/WriteHalf. impl ESockWriteHalf { /// Does this write half have a live corresponding read half? /// /// It's not required to have one, however, exchange is not possible without since it requires sticking the halves back together. pub fn is_bidirectional(&self) -> bool { Arc::strong_count(&self.0) > 1 } /// Is write encrypted on this half? #[inline(always)] pub fn is_encrypted(&self) -> bool { self.2 } /// The local RSA private key #[inline] pub fn local_key(&self) -> &RsaPrivateKey { &self.0.us } /// THe remote RSA public key (if exchange has happened.) #[inline] pub fn foreign_key(&self) -> Option<&RsaPublicKey> { self.0.them.as_ref() } /// End an encrypted session syncronously. /// /// Same as calling `set_encryption(false).now_or_never()`, but more efficient. pub fn clear_encryption(&mut self) { self.2 = false; } } impl ESockReadHalf { /// Does this read half have a live corresponding write half? /// /// It's not required to have one, however, exchange is not possible without since it requires sticking the halves back together. pub fn is_bidirectional(&self) -> bool { Arc::strong_count(&self.0) > 1 } /// Is write encrypted on this half? #[inline(always)] pub fn is_encrypted(&self) -> bool { self.2 } /// The local RSA private key #[inline] pub fn local_key(&self) -> &RsaPrivateKey { &self.0.us } /// THe remote RSA public key (if exchange has happened.) #[inline] pub fn foreign_key(&self) -> Option<&RsaPublicKey> { self.0.them.as_ref() } /// End an encrypted session syncronously. /// /// Same as calling `set_encryption(false).now_or_never()`, but more efficient. pub fn clear_encryption(&mut self) { self.2 = false; } } impl ESockWriteHalf { /// Begin or end an encrypted writing session pub async fn set_encryption(&mut self, set: bool) -> eyre::Result<()> { if set { set_encrypted_write_for(&self.0, &mut self.1).await?; self.2 = true; } else { self.2 = false; } Ok(()) } } impl ESockReadHalf { /// Begin or end an encrypted reading session pub async fn set_encryption(&mut self, set: bool) -> eyre::Result<()> { if set { set_encrypted_read_for(&self.0, &mut self.1).await?; self.2 = true; } else { self.2 = false; } Ok(()) } } impl AsyncWrite for ESockWriteHalf { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { if self.2 { // Encrypted self.project().1.poll_write(cx, buf) } else { // Unencrypted // SAFETY: Uhh... well I think this is fine? Because we can project the container. // TODO: Can we project the `tx`? Or maybe add a method in `AsyncSink` to map a pinned sink to a `Pin<&mut W>`? unsafe { self.map_unchecked_mut(|this| this.1.inner_mut()).poll_write(cx, buf)} } } #[inline(always)] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().1.poll_flush(cx) } #[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().1.poll_flush(cx) } } impl AsyncRead for ESockReadHalf { fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { if self.2 { // Encrypted self.project().1.poll_read(cx, buf) } else { // Unencrypted // SAFETY: Uhh... well I think this is fine? Because we can project the container. // TODO: Can we project the `tx`? Or maybe add a method in `AsyncSink` to map a pinned sink to a `Pin<&mut W>`? unsafe { self.map_unchecked_mut(|this| this.1.inner_mut()).poll_read(cx, buf)} } } } #[cfg(test)] mod tests { use super::ESock; #[test] fn rsa_ciphertext_len() -> crate::eyre::Result<()> { let data = { use chacha20stream::cha::{KEY_SIZE, IV_SIZE}; let (key, iv) = chacha20stream::cha::keygen(); let (sz, d) = crate::bin::collect_slices_exact::<&[u8], _, {KEY_SIZE + IV_SIZE}>([key.as_ref(), iv.as_ref()]); assert_eq!(sz, d.len()); d }; println!("KEY+IV: {} bytes", data.len()); let key = cryptohelpers::rsa::RsaPublicKey::generate()?; let rsa = cryptohelpers::rsa::encrypt_slice_to_vec(data, &key)?; println!("Rsa ciphertext size: {}", rsa.len()); assert_eq!(rsa.len(), super::RSA_CIPHERTEXT_SIZE, "Incorrect RSA ciphertext length constant for cc20 KEY+IV encoding."); Ok(()) } #[test] fn rsa_serial_ciphertext_len() -> crate::eyre::Result<()> { let data = serde_cbor::to_vec(&{ let (key, iv) = chacha20stream::cha::keygen(); super::ESockSessionKey { key, iv, } }).expect("Failed to CBOR encode Key+IV"); println!("(cbor) KEY+IV: {} bytes", data.len()); let key = cryptohelpers::rsa::RsaPublicKey::generate()?; let rsa = cryptohelpers::rsa::encrypt_slice_to_vec(data, &key)?; println!("Rsa ciphertext size: {}", rsa.len()); assert_eq!(rsa.len(), super::RSA_CIPHERTEXT_SIZE, "Incorrect RSA ciphertext length constant for cc20 KEY+IV CBOR encoding."); Ok(()) } fn gen_duplex_esock(bufsz: usize) -> crate::eyre::Result<(ESock, ESock)> { use crate::*; let (atx, brx) = tokio::io::duplex(bufsz); let (btx, arx) = tokio::io::duplex(bufsz); let tx = ESock::new(atx, arx).wrap_err(eyre!("Failed to create TX"))?; let rx = ESock::new(btx, brx).wrap_err(eyre!("Failed to create RX"))?; Ok((tx, rx)) } #[tokio::test] async fn esock_exchange() -> crate::eyre::Result<()> { use crate::*; const VALUE: &'static [u8] = b"Hello world!"; // The duplex buffer size here is smaller than an RSA ciphertext block. So, writing the session key must be buffered with a buffer size this small (should return Pending at least once.) // Using a low buffer size to make sure the test passes even when the entire buffer cannot be written at once. let (mut tx, mut rx) = gen_duplex_esock(16).wrap_err(eyre!("Failed to weave socks"))?; let writer = tokio::spawn(async move { use tokio::prelude::*; tx.exchange().await?; assert!(tx.has_exchanged()); tx.set_encrypted_write(true).await?; assert_eq!((true, false), tx.is_encrypted()); tx.write_all(VALUE).await?; tx.write_all(VALUE).await?; // Check resp tx.set_encrypted_read(true).await?; assert_eq!({ let mut chk = [0u8; 3]; tx.read_exact(&mut chk[..]).await?; chk }, [0xaau8,0, 0], "Failed response check"); // Write unencrypted tx.set_encrypted_write(false).await?; tx.write_all(&[2,1,0xfa]).await?; Result::<_, eyre::Report>::Ok(VALUE) }); let reader = tokio::spawn(async move { use tokio::prelude::*; rx.exchange().await?; assert!(rx.has_exchanged()); rx.set_encrypted_read(true).await?; assert_eq!((false, true), rx.is_encrypted()); let mut val = vec![0u8; VALUE.len()]; rx.read_exact(&mut val[..]).await?; let mut val2 = vec![0u8; VALUE.len()]; rx.read_exact(&mut val2[..]).await?; assert_eq!(val, val2); // Send resp rx.set_encrypted_write(true).await?; rx.write_all(&[0xaa, 0, 0]).await?; // Read unencrypted rx.set_encrypted_read(false).await?; assert_eq!({ let mut buf = [0u8; 3]; rx.read_exact(&mut buf[..]).await?; buf }, [2u8,1,0xfa], "2nd response incorrect"); Result::<_, eyre::Report>::Ok(val) }); let (writer, reader) = tokio::join![writer, reader]; let writer = writer.expect("Tx task panic"); let reader = reader.expect("Rx task panic"); eprintln!("Txr: {:?}", writer); eprintln!("Rxr: {:?}", reader); writer?; let val = reader?; println!("Read: {:?}", val); assert_eq!(&val, VALUE); Ok(()) } #[tokio::test] async fn esock_split() -> crate::eyre::Result<()> { use super::*; const SLICES: &'static [&'static [u8]] = &[ &[1,5,3,7,6,9,100,0], &[7,6,2,90], &[3,6,1,0], &[5,1,3,3], ]; let result = SLICES.iter().map(|&slice| slice.iter().map(|&b| u64::from(b)).sum::()).sum::(); println!("Result: {}", result); let (mut tx, mut rx) = gen_duplex_esock(super::TRANS_KEY_MAX_SIZE * 4).wrap_err(eyre!("Failed to weave socks"))?; let (writer, reader) = { use tokio::prelude::*; let writer = tokio::spawn(async move { tx.exchange().await?; let (mut tx, mut rx) = tx.split(); //tx.set_encryption(true).await?; let slices = &SLICES[1..]; for &slice in slices.iter() { println!("Writing slice: {:?}", slice); tx.write_all(slice).await?; } //let mut tx = ESock::unsplit(tx, rx); tx.write_all(SLICES[0]).await?; Result::<_, eyre::Report>::Ok(()) }); let reader = tokio::spawn(async move { rx.exchange().await?; let (mut tx, mut rx) = rx.split(); //rx.set_encryption(true).await?; let (mut mtx, mut mrx) = tokio::sync::mpsc::channel::>(16); let sorter = tokio::spawn(async move { let mut done = 0u64; while let Some(buf) = mrx.recv().await { //buf.sort(); done += buf.iter().map(|&b| u64::from(b)).sum::(); println!("Got buffer: {:?}", buf); tx.write_all(&buf).await?; } Result::<_, eyre::Report>::Ok(done) }); let mut buffer = [0u8; 16]; while let Ok(read) = rx.read(&mut buffer[..]).await { if read == 0 { break; } mtx.send(Vec::from(&buffer[..read])).await?; } drop(mtx); let sum = sorter.await.expect("(reader) Sorter task panic")?; Result::<_, eyre::Report>::Ok(sum) }); let (writer, reader) = tokio::join![writer, reader]; (writer.expect("Writer task panic"), reader.expect("Reader task panic")) }; writer?; assert_eq!(result, reader?); Ok(()) } }