Compare commits

...

5 Commits

Author SHA1 Message Date
Avril 1bfa6bbd4e bsock_reader(): Buffers and marshals bytes from network to fake socket.
3 years ago
Avril c658e8a702
Added `timesync()`: Force a future to take *at least* a certain duration of time to complete.
3 years ago
Avril fbf96276ca
Added `ESock::transfer_reader()`, `ESock::transfer_writer()`: Specific versions of `transfer_state()`.
3 years ago
Avril 444f3f38dc
Added `ESock::transfer_state()`: Change the `tx` and `rx` types in a encrypted socket while maintaining its keys and other encryption state.
3 years ago
Avril 30522ec96f
Created crude diagram for socket buffer pipelining tasks and communication with network and user.
3 years ago

62
Cargo.lock generated

@ -23,6 +23,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "atomic_refcell"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "681b971236e0f76b20fcafca0236b8718c9186ee778d67cd78bd5f28fd85427f"
[[package]]
name = "autocfg"
version = "1.0.1"
@ -112,9 +118,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20stream"
version = "2.1.0"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54c8d48b47fa0a89a94b80d32b1b3fc9ffc1a232a5201ff5a2d14ac77bc7561d"
checksum = "87df7ca03a990f2bb57c659b78c8f2fbc48fd2c6ed5ecc764e239a7ad7e9cfa1"
dependencies = [
"base64 0.13.0",
"getrandom 0.2.3",
@ -128,6 +134,20 @@ dependencies = [
"tokio 0.2.25",
]
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"winapi 0.3.9",
]
[[package]]
name = "color-eyre"
version = "0.5.11"
@ -383,7 +403,7 @@ checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -597,6 +617,25 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -800,9 +839,11 @@ name = "rsh"
version = "0.1.0"
dependencies = [
"ad-hoc-iter",
"atomic_refcell",
"base64 0.13.0",
"bytes 1.0.1",
"chacha20stream",
"chrono",
"color-eyre",
"cryptohelpers",
"futures",
@ -966,6 +1007,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "tokio"
version = "0.2.25"
@ -1122,9 +1174,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"

@ -7,9 +7,11 @@ edition = "2018"
[dependencies]
ad-hoc-iter = "0.2.3"
atomic_refcell = "0.1.7"
base64 = "0.13.0"
bytes = { version = "1.0.1", features = ["serde"] }
chacha20stream = { version = "2.1.0", features = ["async", "serde"] }
chacha20stream = { version = "2.2.1", features = ["async", "serde"] }
chrono = { version = "0.4.19", features = ["serde"] }
color-eyre = "0.5.11"
cryptohelpers = { version = "1.8.2" , features = ["serialise", "full"] }
futures = "0.3.16"

@ -0,0 +1 @@
<mxfile host="app.diagrams.net" modified="2021-08-19T19:47:34.219Z" agent="5.0 (X11)" etag="m3W9Xk7ZdncWV_dDKnck" version="14.9.8" type="device"><diagram id="yzWLK1SvWkPlnieqc2aE" name="Page-1">7V1bc5u6Fv41nt0+xAPC3B7T1NntpG06djptz8sZxZYNJxh8MMTJ/vVbEoibRCzfkJ10ptMYcZGQ1vrWt5aWRM+4Wjz9HcOl9zWaoqAHtOlTz/jYA0C3TBf/ISXPWQnQDCcrmcf+NL+qLBj7/6C8UMtLU3+KVrULkygKEn9ZL5xEYYgmSa0MxnG0rl82i4J6rUs4R1zBeAIDvvSnP028rNQBdln+Cflzj9WsW/kbLyC7OH+TlQen0bpSZAx7xlUcRUn2a/F0hQLSe6xfsvuuW84WDYtRmMjc8Pnr//2bzzex+xz88pK///vF/PGfC/aYRxik+RvnrU2eWRegKe6R/DCKEy+aRyEMhmXphzhKwyki9Wj4qLzmSxQtcaGOC/+HkuQ5H16YJhEu8pJFkJ/N6iQVtb5cXrSK0niCXngjJiQwnqPkhevcYgiw8KJogZL4Gd8XowAm/mO9HTAXonlxXdnP+Efe1Vt0O9/rP0ef74YjXHZ3Ob7pASvAbf9wH+Nf84R2UFYy9R9Z0Y8Vwqe1dewnWEGIWhD5hg+Ijl6M4KIHrsgFnj/xSIPIRR4MpwEeKfwLxajfXhF+rUpdgupv1yGt1SP1Dce3Vzd/rVhzaEXBrF7/GiNEeUcM1/j/KE2WaUKHdfKAki3a05DRUgKJOOEqEzReQioopN66tM38ILiKgiim9xpTiJzZBJfjXoseUOWMNXHQ/ayo7xHFCXp6WUJ5iSqwMMeBHAnN/HBdoorOoMKrIspAO5IQOkpUHT35ya/K79/kUX0zP/r4lD+ZHjznBweEByAJD5ajEh+AAJU5DRwNLz9KQMZ2mpzpZYiSdRQ/FIpJ3jGccmiT3TLBIpGgGI8GuS6aCVABo9E0BwV6nMPUPZw8+OG8AKzXrP9uQ/9dWf0Hx9J/4HBSdv3lx/jTmRMAQ1LDMwqnSsMHppgCvJHOb9GVbvre4OX+8gb3vDa+w5j6dRv29Tmk/GVH1MJtxX4M2hqxZrMZmAgRa2rdW6Z1IMZSByzdMDnEcgSAdTS+AgYCq3jW2uLKQhVQqS4u1+2jzJzP4mhRGPs0Uwg/Uwj8lzIESM6jxTKKIW3xfTqbketazbe2WRkOIdoNY1wI7SZrbB1LuNmDT4GM252xcQYxmzVApQLomnYSyIPC6SUJaeHDSQBXK3+SFV77wTECKbJj46oNpYjGhjPXd3BFvZklXIcEudJwkvhRuIfXtMet765gkAVh1n7iFQiauVrUP1rhUaVwmnldMZog3JHx+278owMArGHXAfZC1t1xjwWwOs8ext8vf37jFVmkZTUtxM35VT34zVCSHJSYSY8YaDKkZb9/V1D3dGC2ZVArg2YKxoyVSSt8XsP3CLOEisw4DZnRG7KQvWd+VykO3IM4Icrem7uRylHxmnsQU4sTrd/D8ZlzU2biNptmUyX8s2ZW+n4YRumcwOr9cxarotEn45oHRS9a3KerzYBYR88j8M+BLP/UmzpxOKf41XlXwJSUYMNQKsF8JGiMqNmfpfRu5jNp0SP9g8U56FWmTLQRmsN4GqAVEfYy/qrYxzKMuowbJh8/6NbHAq4KiS74glbjC9mUxwuM4ZCaYJ8HltucJuRhtgkmzHTeIEhXnnrJbgg2UB08MESz+Jzb8e1WNiwpy4C1vm6bVanW+5plb2LC5Og7in387ihu0mP2xCIYAWzjRZpMDppPU0GDWpzgbqgzxyeaDFiaOustxKQjKm3wxvD8qbQhOyuiNJBi8HHe4Wh0O8JF7+i07/u3MgyG0rl/kyfixThQ+/dmxsHUVI6DwU8TXmZ8PHcrV/TNT8mtNAZ19AYiZqKJ3ErtaNSE53SEh+zOOBxQYxxaX9M3Mo6CXDh2jVtYL0fgRNRCyF5UKEcbVe+GcJhugwM3BUiWcFhNkTU7Jhx8EsyY5jtVZy17luZjPLzE/4ZYTOMeoVv4ltiH98HpedpFkEiZQzLgo3J7TaHsChf1EP1Gl1sUpNdehIhDar4jqfmDlnDVH83fTkZ5+z5CSRqT+cFsQq4KAYjqPdCi8nSInsjrpJU87DxU0SvzI31yLk93ZLkRLDtbMWw0ncYL2ZTk48EGT32/DX/d4ZKW3LgSB8IoJDx3CldewagOEIQrqYOpBBP+QEKnkMCTgT1kjpR/hwmmiSEtARqJZGUpe2wtETiAbOp9DdRjZpY+UBkzk12C0+ZndyO1g0aKArB3lFoTNB7UzNg+stSy/mg3ZMx4VZL2idX6qxJSp3arr9wmmQ0qqz4xz+QjkuowQdpauVi3qojgvpzSd2RA2Dfg80YAAcsQfK5ctiQXrNobDFxxPaW4Z088LNrwyS4F2tzeSIDMyWGM+vk7kw+S0VmP9oCvdLxsV7rr1gkFUAof0vhhvWn8OJiKnxQNtqV5cEVkO0ut/COaRzFthQvZ4IFt7TKAGNOPagotUcqccJmubFyzuxUf2wQ0TiCDyZL1KYFYzbqZMrR4O65ijFX0u9WCbx11vFQClHCV58bVGnkKGtuq4thLPeuTwu0LPNuWhPb2J8hOIwkIaKoJssU7HYKRovsNSI/rqNyTgDBtklW7+/qacx5eQ/nwMjbyinBTdjLPUprzZfFMv762N9sOpNgEJA0Tv5aSTjcVyvLUFXv1ut7kuMpDhzZvlUS5Lx2Svh0oX98Ah0nxPaR2yS6cV7uLj83PMJ9/Mqt039tKqbjDh+3zLMq30fvOQGXvu4Lev+UzJV9r3ytFHdbMSt9/YOaaWurDpKyWkL6n2W4mD9i82RZtM+MczWqbSkz0jilrtairvLBvtpz77tAkjtM1d0EbSKaKbBs/dE1xPW3xw+b1bI3XUeOHNq+pZw6RNpAlB2qJGeA6/jqfocwmLtdeFNDdEDNnqHR3QtEuq40h6xpCmzpV7KNQdXyMLh0f5zT8+R2xcbOUSzv3SqXcEQVFzxtezqPjbT6qchUgGPda9z/rHjLMurlTHitxXp0tdAaSwqrWXXAtFf0sjEht2AV612nImhng8xbKDdQOHKWSFoC9ufZ+mic1i80vimJJpfS+vvS0y3WVO9HksHwdxSxGlRyxLI0s519rnxbFZVYrjTvnLdhxyqZ7T7cRoDZlQfdorq5rKNX9nVIQ+vVo9AbdP6Q6y5IPtb6NI9jVu+bbkH3afVibxTlBp0bntg8SxIU61hYRRRFMKzOQIp1KF4cZl9LoeNGjq02H42jyQGbiPpHZabraNPEgQUn2FQ3Ijxj7Bk62o34lDbf2oQCyYdQ7rgpgwQUZUtqej+kyQE/j/LH5mazFlrbFHLnw7a4lG137IAprdK1lW7WlGfDseokuqBPuCyBw0ovNV/b9/gc+LL8rlEWrys8zGcN/AQ==</diagram></mxfile>

@ -13,6 +13,9 @@ pub use hex::*;
mod base64;
pub use self::base64::*;
pub mod sync;
pub use sync::TimeSyncExt;
/// A maybe-atom that can spill into a vector.
pub type MaybeVec<T> = SmallVec<[T; 1]>;

@ -0,0 +1,74 @@
//! Syncronisation helpers
use super::*;
use futures::{
Future,
};
use tokio::time::Duration;
/// Force a future to take *at least* a specific amount of time to complete.
///
/// If the future takes longer than `time` to complete, no additional delay is added.
/// If `time` is zero, no delay is added.
///
/// # Panics
/// The returned future will panic if
/// * The future itself panics
/// * We are unable to convert the future's duration from `chrono`'s duration span to std's duration span
pub fn timesync<'a, T, F>(future: F, time: Duration) -> impl Future<Output = T> + 'a
where F: Future<Output=T> + 'a
{
use chrono::prelude::*;
#[cold]
#[inline(never)]
fn _panic_failed_tmconv(orig: chrono::Duration, error: impl Into<eyre::Report>)
{
panic!("Failed to convert chrono duration span {:?} to std duration span: {}", orig, error.into());
}
async move {
let now = Utc::now();
let r = future.await;
if time.is_zero() {
return r;
}
let after = Utc::now();
let duration = after - now;
match duration.to_std() {
Ok(taken)
if taken < time => {
// Delay for the remaining time
tokio::time::delay_for(time - taken).await;
},
Err(error) => _panic_failed_tmconv(duration, error),
// At least `time` has passed
_ => (),
}
r
}
}
pub trait TimeSyncExt<'a>: Future + Sized + 'a
{
type OutputFuture: Future<Output = Self::Output> + 'a;
fn pad_time(self, time: Duration) -> Self::OutputFuture;
}
#[cfg(nightly)]
impl<'a, F> TimeSyncExt<'a> for F
where F: Future + 'a
{
//#[cfg(nightly)]
type OutputFuture = impl Future<Output = Self::Output> + 'a;
//#[cfg(not(nightly))]
//type OutputFuture = futures::future::BoxFuture<'a, Self::Output>; //TODO: Requires `Send` bound for some reason.
fn pad_time(self, time: Duration) -> Self::OutputFuture {
/*#[cfg(not(nightly))] {
use futures::prelude::*;
return timesync(self, time).boxed();
}
#[cfg(nightly)] {
return*/ timesync(self, time)
//}
}
}

@ -1,5 +1,7 @@
//! Remote communication
#![cfg_attr(nightly, feature(const_fn_trait_bound))]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![allow(dead_code)]

@ -182,6 +182,61 @@ pub struct ESock<W, R> {
impl<W: AsyncWrite, R: AsyncRead> ESock<W, R>
{
/// Move this `ESock`'s state into another with a different reader, returning the new `ESock` and the old reader stream
pub fn transfer_reader<Rx>(self, rx: Rx) -> (ESock<W, Rx>, (R,))
where Rx: AsyncRead
{
let ESock { info, state, rx: orx, tx } = self;
let (orx, rx) = {
let (s, c) = orx.into_parts();
(s, AsyncSource::from_parts(rx, c))
};
(ESock {
state, info, tx, rx
}, (orx,))
}
/// Move this `ESock`'s state into another with a different writer, returning the new `ESock` and the old writer stream
pub fn transfer_writer<Wx>(self, tx: Wx) -> (ESock<Wx, R>, (W,))
where Wx: AsyncWrite
{
let ESock { info, state, rx, tx: otx } = self;
let (otx, tx) = {
let (s, c) = otx.into_parts();
(s, AsyncSink::from_parts(tx, c))
};
(ESock {
info, state, tx, rx
}, (otx,))
}
/// Move this `ESock`'s state into another, returning the new `ESock` and the old streams
pub fn transfer_state<Wx, Rx>(self, tx: Wx, rx: Rx) -> (ESock<Wx, Rx>, (W, R))
where Wx: AsyncWrite,
Rx: AsyncRead
{
let ESock { info, state, tx: otx, rx: orx } = self;
let (otx, tx) = {
let (s, c) = otx.into_parts();
(s, AsyncSink::from_parts(tx, c))
};
let (orx, rx) = {
let (s, c) = orx.into_parts();
(s, AsyncSource::from_parts(rx, c))
};
(ESock {
info, state, tx, rx
}, (otx, orx))
}
fn inner(&self) -> (&W, &R)
{
(self.tx.inner(), self.rx.inner())

@ -11,6 +11,9 @@ use std::{
PhantomData,
},
};
use futures::{
Future,
};
use tokio::sync::{
mpsc,
};
@ -25,12 +28,83 @@ use enc::{
pub const DEFAULT_BUFFER_SIZE: usize = 32;
/// Task-based buffered piping to/from encrypted sockets.
#[derive(Debug)]
pub struct BufferedESock<W, R>
{
bufsz: usize,
_backing: PhantomData<ESock<W, R>>,
}
/// Info for bsock tasks
#[derive(Debug, Clone, PartialEq, Eq)]
struct BSockTaskInfo
{
bufsz: usize,
}
/// `tx`: ESock-wrapped network output socket.
/// `rx`: Reading half of the user's fake stream
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be written to, so I think making it opaque with a generic is fine.
fn bsock_writer<'a, Raw, Fake>(info: BSockTaskInfo, mut tx: ESockWriteHalf<Raw>, mut rx: Fake) -> impl Future<Output = ()> + 'a
where Raw: AsyncWrite + Unpin + 'a,
Fake: AsyncRead + Unpin + 'a,
{
async move {
}
}
/// `rx`: Raw, unencrypted network input socket
/// `tx`: Writing half of the user's `ESockReadHalf`'s rx stream, which wraps the receiver from this fake sender stream.
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be read from, so I think making it opaque with a generic is fine.
fn bsock_reader<'a, Raw, Fake>(info: BSockTaskInfo, mut rx: Raw, mut tx: Fake) -> impl Future<Output= ()> + 'a
where Raw: AsyncRead + Unpin + 'a,
Fake: AsyncWrite + Unpin + 'a
{
use tokio::prelude::*;
async move {
let mut last_read_error = None;
let mut buffer = vec![0u8; info.bufsz];
loop {
// Read into buffer.
let mut read;
let mut done=0;
while {
match rx.read(&mut buffer[done..]).await {
Ok(n) => {read = n; n > 0},
Err(err) => {read = 0; last_read_error = Some(err); false},
}
} {
done += read;
if done == info.bufsz {
// Buffer is full.
break;
}
}
// Write the buffer
if done > 0 {
if let Err(write_err) = tx.write_all(&buffer[..done]).await {
//We don't want to propagate this error. If writing to the fake socket fails, then it has been dropped or closed some other way, and that is fine. Just report then exit the task.
eprintln!("bsock_reader: Failed to write to fake stream: {}", write_err);
break;
}
}
// Check if there was an error mid-way through reading the buffer
// XXX: Should we instead propagate read errors immediately, ignoring the partially-filled buffer? I think not, as we may get a valid partial buffer which the remote socket then disconnects (exits process) from afterwards.
if let Some(read_err) = last_read_error.take()
{
//TODO: How to propagate errors to `tx`?
eprintln!("bsock_reader: Failed to read from real stream: {}", read_err);
let _ = tx.shutdown().await;
break;
}
}
}
}
impl<W, R> BufferedESock<W, R>
where W: AsyncWrite + Unpin + Send + 'static,
R: AsyncRead + Unpin + Send + 'static
@ -38,7 +112,7 @@ where W: AsyncWrite + Unpin + Send + 'static,
/// Create a new buffered ESock pipe with a specific buffer size
pub fn with_size(tx: W, rx: R, bufsz: usize) -> Self
{
//TODO: Spawn read+write buffer tasks
Self {
bufsz,
_backing: PhantomData,

Loading…
Cancel
Save