host command receiving params from config.

new
Avril 4 years ago
parent b2167c363c
commit 1fb08b35a0
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,9 +1,53 @@
//! Sync utils //! Sync utils
use super::*; use super::*;
use std::sync::Arc; use tokio::sync::oneshot;
use std::sync::atomic::AtomicBool; use futures::prelude::*;
use std::mem::MaybeUninit;
use std::cell::UnsafeCell; /// Type to allow for a seperate thread or task to initialise a value.
#[derive(Debug)]
pub struct SharedUninit<T>(oneshot::Receiver<T>);
/// Type to initialise a value for a `SharedUninit`.
#[derive(Debug)]
pub struct SharedInitialiser<T>(oneshot::Sender<T>);
impl<'a, T> SharedUninit<T>
where T: 'a
{
/// Create an uninit/initialiser pair.
pub fn pair() -> (SharedInitialiser<T>, Self)
{
let (tx, rx) = oneshot::channel();
(SharedInitialiser(tx), Self(rx))
}
/// Try to receive the initialised value.
///
/// Returns `None` if the initialiser was dropped before setting a value.
#[inline] pub fn try_get(self) -> impl Future<Output = Option<T>> + 'a
{
self.0.map(|x| x.ok())
}
/// Receive the initialised value.
///
/// # Panics
/// If the initialiser was dropped without setting a value.
#[inline] pub fn get(self) -> impl Future<Output = T> + 'a
{
self.try_get().map(|x| x.unwrap())
}
}
impl<'a, T> SharedInitialiser<T>
where T: 'a
{
/// Set the value for the `SharedUninit`.
#[inline] pub fn init(self, value: T)
{
let _ = self.0.send(value);
}
}
// This was a failure. Just use `tokio::sync::oneshot`... // This was a failure. Just use `tokio::sync::oneshot`...

@ -37,13 +37,13 @@ pub struct ServiceSettings
// Request dispatching options // Request dispatching options
/// How many requests to batch together /// How many requests to batch together
pub req_dispatch_hold: usize, pub req_dispatch_chunk: NonZeroUsize,
/// How long to wait before forcefully processing an unfilled batch of requests /// How long to wait before forcefully processing an unfilled batch of requests
pub req_dispatch_force_timeout: Option<Duration>, pub req_dispatch_force_timeout: Option<Duration>,
/// How long to wait before processing batches of requests /// How long to wait before processing batches of requests
pub req_dispatch_delay: Option<Duration>, pub req_dispatch_delay: Option<Duration>,
/// Random **millisecond** delay bounds between request batch processing /// Random **nanosecond** delay bounds between request batch processing
pub req_dispatch_jitter: Option<(i64, i64)>, pub req_dispatch_jitter: Option<(u64, u64)>,
/// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks. /// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks.
/// This may result in commands being processed out-of-order sometimes. /// This may result in commands being processed out-of-order sometimes.
/// ///

@ -1,6 +1,10 @@
//! The actual running task //! The actual running task
use super::*; use super::*;
use futures::prelude::*; use futures::{
prelude::*,
future::OptionFuture,
};
use std::time::Duration;
pub type SupervisorError = (); //TODO pub type SupervisorError = (); //TODO
pub type Error = (); // TODO pub type Error = (); // TODO
@ -13,21 +17,37 @@ pub(super) fn spawn_supervisor(service: Service) -> JoinHandle<Result<(), Superv
}) })
} }
/// Delay for a number of **nanoseconds** between the specified bounds
fn jitter_for(bounds: (u64, u64)) -> OptionFuture<impl Future<Output = ()> + 'static>
{
match bounds.jitter() {
0 => None.into(),
x => Some(tokio::time::delay_for(Duration::from_nanos(x))).into()
}
}
fn spawn_slave(service: Service) -> JoinHandle<Result<(), Error>> fn spawn_slave(service: Service) -> JoinHandle<Result<(), Error>>
{ {
let Service { inner: service, rx } = service; let Service { inner: service, rx } = service;
tokio::spawn(async move { tokio::spawn(async move {
let cfg = service.opt.as_ref();
let mut rx = rx let mut rx = rx
.chunk(10) // TODO: from config .chunk(cfg.req_dispatch_chunk.into())
.lag(duration!(10 ms)); // TODO: from config .lag(cfg.req_dispatch_delay.unwrap_or(Duration::ZERO));
let mut timeout = tokio::time::interval(duration!(200 ms)); // TODO: from config let mut timeout = cfg.req_dispatch_force_timeout.map(tokio::time::interval);
loop { loop {
tokio::select! { tokio::select! {
block = rx.next() => { block = rx.next() => {
match block { match block {
Some(block) => { Some(block) => {
// TODO: Filter and/or process this block if let Some(bounds) = cfg.req_dispatch_jitter.and_then(|x| (x.0 + x.1 > 0).then(|| x)) {
// Jitter delay.
jitter_for(bounds).await;
}
// TODO: Filter and then/or process this block
}, },
None => { None => {
// Reached the end of stream, exit gracefully. // Reached the end of stream, exit gracefully.
@ -35,7 +55,7 @@ fn spawn_slave(service: Service) -> JoinHandle<Result<(), Error>>
} }
} }
} }
_ = timeout.tick() => { _ = OptionFuture::from(timeout.as_mut().map(|x| x.tick())), if timeout.is_some() => {
// Cause the `rx` to release a non-full chunk. // Cause the `rx` to release a non-full chunk.
rx.get_mut().push_now(); rx.get_mut().push_now();
} }

Loading…
Cancel
Save