From 1fb08b35a00f21433d796deffc15da63a742b4fe Mon Sep 17 00:00:00 2001 From: Avril Date: Wed, 12 May 2021 23:37:42 +0100 Subject: [PATCH] host command receiving params from config. --- src/ext/sync.rs | 52 +++++++++++++++++++++++++++++++++++++++---- src/service/config.rs | 6 ++--- src/service/host.rs | 32 +++++++++++++++++++++----- 3 files changed, 77 insertions(+), 13 deletions(-) diff --git a/src/ext/sync.rs b/src/ext/sync.rs index 7dbd623..a6e8016 100644 --- a/src/ext/sync.rs +++ b/src/ext/sync.rs @@ -1,9 +1,53 @@ //! Sync utils use super::*; -use std::sync::Arc; -use std::sync::atomic::AtomicBool; -use std::mem::MaybeUninit; -use std::cell::UnsafeCell; +use tokio::sync::oneshot; +use futures::prelude::*; + +/// Type to allow for a seperate thread or task to initialise a value. +#[derive(Debug)] +pub struct SharedUninit(oneshot::Receiver); + +/// Type to initialise a value for a `SharedUninit`. +#[derive(Debug)] +pub struct SharedInitialiser(oneshot::Sender); + +impl<'a, T> SharedUninit + where T: 'a +{ + /// Create an uninit/initialiser pair. + pub fn pair() -> (SharedInitialiser, Self) + { + let (tx, rx) = oneshot::channel(); + (SharedInitialiser(tx), Self(rx)) + } + + /// Try to receive the initialised value. + /// + /// Returns `None` if the initialiser was dropped before setting a value. + #[inline] pub fn try_get(self) -> impl Future> + 'a + { + self.0.map(|x| x.ok()) + } + + /// Receive the initialised value. + /// + /// # Panics + /// If the initialiser was dropped without setting a value. + #[inline] pub fn get(self) -> impl Future + 'a + { + self.try_get().map(|x| x.unwrap()) + } +} + +impl<'a, T> SharedInitialiser + where T: 'a +{ + /// Set the value for the `SharedUninit`. + #[inline] pub fn init(self, value: T) + { + let _ = self.0.send(value); + } +} // This was a failure. Just use `tokio::sync::oneshot`... diff --git a/src/service/config.rs b/src/service/config.rs index f46a5c1..cb7e488 100644 --- a/src/service/config.rs +++ b/src/service/config.rs @@ -37,13 +37,13 @@ pub struct ServiceSettings // Request dispatching options /// How many requests to batch together - pub req_dispatch_hold: usize, + pub req_dispatch_chunk: NonZeroUsize, /// How long to wait before forcefully processing an unfilled batch of requests pub req_dispatch_force_timeout: Option, /// How long to wait before processing batches of requests pub req_dispatch_delay: Option, - /// Random **millisecond** delay bounds between request batch processing - pub req_dispatch_jitter: Option<(i64, i64)>, + /// Random **nanosecond** delay bounds between request batch processing + pub req_dispatch_jitter: Option<(u64, u64)>, /// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks. /// This may result in commands being processed out-of-order sometimes. /// diff --git a/src/service/host.rs b/src/service/host.rs index f164f19..e1e47f2 100644 --- a/src/service/host.rs +++ b/src/service/host.rs @@ -1,6 +1,10 @@ //! The actual running task use super::*; -use futures::prelude::*; +use futures::{ + prelude::*, + future::OptionFuture, +}; +use std::time::Duration; pub type SupervisorError = (); //TODO pub type Error = (); // TODO @@ -13,21 +17,37 @@ pub(super) fn spawn_supervisor(service: Service) -> JoinHandle OptionFuture + 'static> +{ + match bounds.jitter() { + 0 => None.into(), + x => Some(tokio::time::delay_for(Duration::from_nanos(x))).into() + } +} + fn spawn_slave(service: Service) -> JoinHandle> { let Service { inner: service, rx } = service; tokio::spawn(async move { + let cfg = service.opt.as_ref(); let mut rx = rx - .chunk(10) // TODO: from config - .lag(duration!(10 ms)); // TODO: from config - let mut timeout = tokio::time::interval(duration!(200 ms)); // TODO: from config + .chunk(cfg.req_dispatch_chunk.into()) + .lag(cfg.req_dispatch_delay.unwrap_or(Duration::ZERO)); + let mut timeout = cfg.req_dispatch_force_timeout.map(tokio::time::interval); + loop { tokio::select! { block = rx.next() => { match block { Some(block) => { - // TODO: Filter and/or process this block + if let Some(bounds) = cfg.req_dispatch_jitter.and_then(|x| (x.0 + x.1 > 0).then(|| x)) { + // Jitter delay. + jitter_for(bounds).await; + } + + // TODO: Filter and then/or process this block }, None => { // Reached the end of stream, exit gracefully. @@ -35,7 +55,7 @@ fn spawn_slave(service: Service) -> JoinHandle> } } } - _ = timeout.tick() => { + _ = OptionFuture::from(timeout.as_mut().map(|x| x.tick())), if timeout.is_some() => { // Cause the `rx` to release a non-full chunk. rx.get_mut().push_now(); }