diff --git a/Cargo.lock b/Cargo.lock index addf780..f696585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -524,6 +524,7 @@ dependencies = [ "serde_cbor", "serde_json", "smallvec", + "stackalloc", "tokio", "uuid", ] @@ -832,6 +833,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "stackalloc" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c790f3002980a878515d27f0f1b4d083f67f9103e9316205088cc4728277de51" +dependencies = [ + "cc", + "rustc_version", +] + [[package]] name = "subtle" version = "2.4.0" diff --git a/Cargo.toml b/Cargo.toml index 268d88a..e7cb250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ serde = {version = "1.0.118", features= ["derive"]} serde_cbor = "0.11.1" serde_json = "1.0.60" smallvec = "1.5.1" +stackalloc = "1.1.0" tokio = {version = "0.2", features = ["full"]} uuid = {version = "0.8", features = ["v4", "serde"]} diff --git a/src/ext/bytes.rs b/src/ext/bytes.rs index dd4b760..7ea8239 100644 --- a/src/ext/bytes.rs +++ b/src/ext/bytes.rs @@ -62,3 +62,35 @@ pub fn explicit_clear(buffer : &mut[u8]) { } len } + +/// Max size of bytes we'll allocate to the stack at runtime before using a heap allocated buffer. +pub const STACK_SIZE_LIMIT: usize = 4096; + +/// Allocate `size` bytes. Allocates on the stack if size is lower than `STACK_SIZE_LIMIT`, otherwise allocates on the heap. +pub fn alloca_limit(size: usize, f: F) -> T +where F: FnOnce(&mut [u8]) -> T +{ + if size > STACK_SIZE_LIMIT { + thread_local! { + static BUFFER: RefCell> = RefCell::new(vec![0u8; STACK_SIZE_LIMIT*2]); + } + BUFFER.with(move |buf| { + // If the borrow fails then `f` has recursively called into this function, so for that we allocate a new buffer instead of reusing this static one. + if let Ok(mut buf) = buf.try_borrow_mut() { + if buf.len() < size { + buf.resize(size, 0); + } + let res = f(&mut buf[..size]); + bytes::clear(&mut buf[..size]); + res + } else { + f(&mut vec![0u8; size]) + } + }) + } else { + stackalloc::alloca_zeroed(size, f) + + // I don't think this is okay to do. + //stackalloc::alloca(size, move |buf| f(unsafe { stackalloc::helpers::slice_assume_init_mut(buf) })) + } +} diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 2e067fa..1e4e679 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -3,6 +3,9 @@ use std::collections::BTreeSet; use std::borrow::Borrow; use futures::prelude::*; +use std::cell::RefCell; +use std::ptr; + mod iters; pub use iters::*; @@ -26,8 +29,204 @@ pub use defer_drop::*; pub mod chunking; +/// How many elements should `precollect` allocate on the stack before spilling to the heap. pub const PRECOLLECT_STACK_SIZE: usize = 64; +/// Create a duration with time suffix `h`, `m`, `s`, `ms` or `ns`. +/// +/// # Combination +/// These can also be combined. +/// ``` +/// # use flan_utils::duration; +/// duration!(1 h, 20 m, 30 s); +/// ``` +#[macro_export] macro_rules! duration +{ + (0 $($_any:tt)?) => (::core::time::Duration::from_secs(0)); + ($dur:literal ms) => (::core::time::Duration::from_millis($dur)); + ($dur:literal ns) => (::core::time::Duration::from_nanos($dur)); + ($dur:literal s) => (::core::time::Duration::from_secs($dur)); + ($dur:literal m) => (::core::time::Duration::from_secs($dur * 60)); + ($dur:literal h) => (::core::time::Duration::from_secs($dur * 60 * 60)); + + ( $($dur:literal $unit:tt),*)=> { + duration!(0 s) $( + + duration!($dur $unit) + )* + }; +} + + +/// Create a basic, C-like enum +#[macro_export] macro_rules! basic_enum { + ($(#[$meta:meta])* $vis:vis $name:ident $(; $tcomment:literal)?: $($var:ident $(=> $comment:literal)?),+ $(,)?) => { + $(#[$meta])* + #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)] + $(#[doc = $tcomment])? + $vis enum $name { + $( + $(#[doc = $comment])? + $var + ),+ + } + } +} + + +/// Create a `Yes` or `No` enum. +#[macro_export] macro_rules! bool_type { + ($vis:vis $name:ident $(; $comment:literal)? => $yes:ident, $no:ident) => { + basic_enum!(#[repr(u8)] $vis $name $(; $comment)?: $yes => "# First variant\n\nYes/true", $no => "# Second variant\n\nNo/false"); + + impl From for $name + { + #[inline] fn from(from: bool) -> Self + { + if from { + Self::$yes + } else { + Self::$no + } + } + } + + impl From<$name> for bool + { + #[inline] fn from(from: $name) -> Self + { + match from { + $name::$yes => true, + $name::$no => false, + } + } + } + + impl $name + { + /// Create from a bool value. + #[inline] pub const fn new(from: bool) -> Self + { + if from { + Self::$yes + } else { + Self::$no + } + } + + /// Is this false? + #[inline] pub const fn is_no(self) -> bool + { + !self.is_yes() + } + /// Is this true? + #[inline] pub const fn is_yes(self) -> bool + { + match self { + Self::$yes => true, + Self::$no => false, + } + } + + /// Return Some(T) if self is true. + #[inline] pub fn some(self, value: T) -> Option + { + self.and_then(move || value) + } + + /// Map this value + #[inline] pub fn map(self, f: F) -> T + where F: FnOnce(bool) -> T + { + f(self.is_yes()) + } + + /// Run this closure if value is false + #[inline] pub fn or_else(self, f: F) -> Option + where F: FnOnce() -> T + { + if let Self::$no = self { + Some(f()) + } else { + None + } + } + /// Run this closure if value is true + #[inline] pub fn and_then(self, f: F) -> Option + where F: FnOnce() -> T + { + if let Self::$yes = self { + Some(f()) + } else { + None + } + } + + /// Return `yes` if true and `no` if false + #[inline] pub fn either(self, yes: T, no: T) -> T + { + self.and_either(move || yes, move || no) + } + /// Run closure `yes` if value is true, `no` if value is false. + #[inline] pub fn and_either(self, yes: F, no: G) -> T + where F: FnOnce() -> T, + G: FnOnce() -> T, + { + match self { + Self::$yes => yes(), + Self::$no => no(), + } + } + } + }; + ($vis:vis $name:ident $(; $comment:literal)?) => { + $crate::bool_type!($vis $name $(; $comment)? => Yes, No); + } +} + + +/// Create an accessor method. for a field in a structure. +/// +/// The supported accessor types are: `ref`, `mut`, and `move`. +#[macro_export] macro_rules! accessor { + ($vis:vis ref $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> &$ty { + &self.$internal + } + }; + ($vis:vis ref $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> &$ty { + &self.$internal + } + }; + ($vis:vis mut $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> &mut $ty { + &mut self.$internal + } + }; + ($vis:vis mut $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> &mut $ty { + &mut self.$internal + } + }; + ($vis:vis move $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> $ty { + self.$internal + } + }; + ($vis:vis move $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => { + $(#[doc=$comment])? + #[inline] $vis fn $name(&self) -> $ty { + self.$internal + } + }; +} + + /// Collect an iterator's output and then drop it to detach the iterator from any references or resources it might have. #[macro_export] macro_rules! precollect { ($iter:expr, $num:literal) => { diff --git a/src/service/builder.rs b/src/service/builder.rs new file mode 100644 index 0000000..f57aa80 --- /dev/null +++ b/src/service/builder.rs @@ -0,0 +1,11 @@ +use super::*; +use config::*; + +/// Builder for a service +#[derive(Debug, Clone)] +pub struct ServiceBuilder +{ + /// Settings for the service to use + // Boxed because this is a large structure + settings: Box, +} diff --git a/src/service/config.rs b/src/service/config.rs new file mode 100644 index 0000000..eaa28c9 --- /dev/null +++ b/src/service/config.rs @@ -0,0 +1,56 @@ +//! Configuration for services +use super::*; +use std::time::Duration; +use std::num::NonZeroUsize; + +/// How long to wait before resetting the restart counter for `StopDirective::Restart`. +pub const SUPERVISOR_RESTART_TIME_LIMIT: Option = Some(Duration::from_secs(5)); + +/// What the supervisor task should do when its background service unexpectedly exits +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Serialize, Deserialize)] +pub enum StopDirective +{ + /// Ignore it, allow the service to exit + Ignore, + /// Restart the service, either infinitely, or up to this many times before exiting. + /// + /// If the restart limit is exceeded, exit with error. + /// The limit is reset every `SUPERVISOR_RESTART_TIME_LIMIT` seconds (or never, if it is `None`.) + Restart(Option), + /// Panic the supervisor + Panic, + /// Exit with error + Error, +} + +impl Default for StopDirective +{ + #[inline] + fn default() -> Self + { + Self::Error + } +} + +/// Settings for how a background service runs +#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, Serialize, Deserialize)] +pub struct ServiceSettings +{ + // Supervisor controls + + /// What to do when the supervisor's child task exits unexpectedly + pub supervisor_stop_directive: StopDirective, + + // Request dispatching options + + /// How many requests to batch together + pub req_dispatch_hold: usize, + /// How long to wait before forcefully processing an unfilled batch of requests + pub req_dispatch_force_timeout: Option, + /// How long to wait before processing batches of requests + pub req_dispatch_delay: Option, + /// Random delay between request batch processing + pub req_dispatch_jitter: Option<(Duration, Duration)>, +} + +//TODO: impl Default for ServiceSettings diff --git a/src/service/mod.rs b/src/service/mod.rs index f14d569..9d893ab 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,12 +1,15 @@ //! The actual running service use super::*; use std::sync::Arc; -use tokio::sync::{ - RwLock, - Mutex, - - mpsc, - oneshot, +use tokio::{ + task::JoinHandle, + sync::{ + RwLock, + Mutex, + + mpsc, + oneshot, + } }; pub mod command; @@ -15,17 +18,27 @@ use command::{ CommandID, }; +pub mod config; + +mod builder; +pub use builder::*; + /// Handle to a running service. Can be used to join it or create `Channel`s. #[derive(Debug)] pub struct Handle { + task: JoinHandle<()>, + channel: Channel, } +/// Inner portion of a `Channel`. Also held through `Arc` by the background service and its supervisor. #[derive(Debug)] struct ChannelInner { - + /// The settings for the service + // Boxed because this is a large structure. + opt: Box, } @@ -46,7 +59,7 @@ struct Request /// Communicates with a running service #[derive(Debug, Clone)] -pub struct Channel{ +pub struct Channel { inner: Arc, @@ -60,3 +73,9 @@ impl PartialEq for Channel { Arc::ptr_eq(&self.inner, &other.inner) } } + +/// Create a new service +pub fn create() -> ServiceBuilder +{ + todo!() +}