Added `timesync()`: Force a future to take *at least* a certain duration of time to complete.

Added `.pad_time()` extension method for `T: Future` (TODO: Currently only compiles on nightly as it requires `impl Future` in trait impl, and must be boxed on stable. But `Future.boxed()` extension requires the future to be `Send` for some reason, which we don"t require. Find a way to box the future without it needing to be `Send`.)

Fortune for rsh's current commit: Curse − 凶
pipelined-socket-buffering
Avril 3 years ago
parent fbf96276ca
commit c658e8a702
Signed by: flanchan
GPG Key ID: 284488987C31F630

51
Cargo.lock generated

@ -134,6 +134,20 @@ dependencies = [
"tokio 0.2.25",
]
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"winapi 0.3.9",
]
[[package]]
name = "color-eyre"
version = "0.5.11"
@ -389,7 +403,7 @@ checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -603,6 +617,25 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -810,6 +843,7 @@ dependencies = [
"base64 0.13.0",
"bytes 1.0.1",
"chacha20stream",
"chrono",
"color-eyre",
"cryptohelpers",
"futures",
@ -973,6 +1007,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "tokio"
version = "0.2.25"
@ -1129,9 +1174,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"

@ -11,6 +11,7 @@ atomic_refcell = "0.1.7"
base64 = "0.13.0"
bytes = { version = "1.0.1", features = ["serde"] }
chacha20stream = { version = "2.2.1", features = ["async", "serde"] }
chrono = { version = "0.4.19", features = ["serde"] }
color-eyre = "0.5.11"
cryptohelpers = { version = "1.8.2" , features = ["serialise", "full"] }
futures = "0.3.16"

@ -14,6 +14,7 @@ mod base64;
pub use self::base64::*;
pub mod sync;
pub use sync::TimeSyncExt;
/// A maybe-atom that can spill into a vector.
pub type MaybeVec<T> = SmallVec<[T; 1]>;

@ -1,2 +1,75 @@
//! Syncronisation helpers
use super::*;
use futures::{
Future,
};
use tokio::time::Duration;
/// Force a future to take *at least* a specific amount of time to complete.
///
/// If the future takes longer than `time` to complete, no additional delay is added.
/// If `time` is zero, no delay is added.
///
/// # Panics
/// The returned future will panic if
/// * The future itself panics
/// * We are unable to convert the future's duration from `chrono`'s duration span to std's duration span
pub fn timesync<'a, T, F>(future: F, time: Duration) -> impl Future<Output = T> + 'a
where F: Future<Output=T> + 'a
{
use chrono::prelude::*;
#[cold]
#[inline(never)]
fn _panic_failed_tmconv(orig: chrono::Duration, error: impl Into<eyre::Report>)
{
panic!("Failed to convert chrono duration span {:?} to std duration span: {}", orig, error.into());
}
async move {
let now = Utc::now();
let r = future.await;
if time.is_zero() {
return r;
}
let after = Utc::now();
let duration = after - now;
match duration.to_std() {
Ok(taken)
if taken < time => {
// Delay for the remaining time
tokio::time::delay_for(time - taken).await;
},
Err(error) => _panic_failed_tmconv(duration, error),
// At least `time` has passed
_ => (),
}
r
}
}
pub trait TimeSyncExt<'a>: Future + Sized + 'a
{
type OutputFuture: Future<Output = Self::Output> + 'a;
fn pad_time(self, time: Duration) -> Self::OutputFuture;
}
#[cfg(nightly)]
impl<'a, F> TimeSyncExt<'a> for F
where F: Future + 'a
{
//#[cfg(nightly)]
type OutputFuture = impl Future<Output = Self::Output> + 'a;
//#[cfg(not(nightly))]
//type OutputFuture = futures::future::BoxFuture<'a, Self::Output>; //TODO: Requires `Send` bound for some reason.
fn pad_time(self, time: Duration) -> Self::OutputFuture {
/*#[cfg(not(nightly))] {
use futures::prelude::*;
return timesync(self, time).boxed();
}
#[cfg(nightly)] {
return*/ timesync(self, time)
//}
}
}

@ -1,5 +1,7 @@
//! Remote communication
#![cfg_attr(nightly, feature(const_fn_trait_bound))]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![allow(dead_code)]

@ -37,6 +37,7 @@ pub struct BufferedESock<W, R>
/// `tx`: ESock-wrapped network output socket.
/// `rx`: Reading half of the user's fake stream
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be written to, so I think making it opaque with a generic is fine.
fn bsock_writer<'a, Raw, Fake>(mut tx: ESockWriteHalf<Raw>, mut rx: Fake) -> impl Future<Output = ()> + 'a
where Raw: AsyncWrite + Unpin + 'a,
Fake: AsyncRead + Unpin + 'a,
@ -46,6 +47,19 @@ Fake: AsyncRead + Unpin + 'a,
}
}
/// `rx`: Raw, unencrypted network input socket
/// `tx`: Writing half of the user's `ESockReadHalf`'s rx stream, which wraps the receiver from this fake sender stream.
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be read from, so I think making it opaque with a generic is fine.
fn bsock_reader<'a, Raw, Fake>(mut rx: Raw, mut tx: Fake) -> impl Future<Output= ()> + 'a
where Raw: AsyncRead + Unpin + 'a,
Fake: AsyncWrite + Unpin + 'a
{
async move {
}
}
impl<W, R> BufferedESock<W, R>
where W: AsyncWrite + Unpin + Send + 'static,
R: AsyncRead + Unpin + Send + 'static

Loading…
Cancel
Save