diff --git a/src/cap.rs b/src/cap.rs new file mode 100644 index 0000000..a447080 --- /dev/null +++ b/src/cap.rs @@ -0,0 +1,40 @@ +//! Capabilities (permissions) of a connection. +use super::*; +use std::time::Duration; + +pub mod fail; +pub use fail::Failures; + +/// How lenient to be with a certain operation +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Copy)] +pub enum Leniency +{ + /// Ignore **all** malformed/missed messages. + Ignore, + /// Allow `n` failures before disconnecting the socket. + Specific(Failures), + /// Allow + Normal, + /// Immediately disconnect the socket on **any** malformed/missed message. + None, +} + +/// A capability (permission) for a raw socket's data transmission. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum RawSockCapability +{ + /// Process + AllowUnsignedMessages, + + /// Do not disconnect the socket when a malformed message is received, just ignore the message. + SoftFail, + + /// Throttle the number of messages to process + RateLimit{ tx: usize, rx: usize }, + + /// The request response timeout for messages with an expected response. + RecvRespTimeout { tx: Duration, rx: Duration }, +} +/// A collection of `RawSockCapability`s for a specific connection. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct RawSockCapabilities; diff --git a/src/cap/fail.rs b/src/cap/fail.rs new file mode 100644 index 0000000..0464ba4 --- /dev/null +++ b/src/cap/fail.rs @@ -0,0 +1,102 @@ +//! Failure counting/capping +use super::*; +use std::{ + fmt, error, +}; + +/// A measure of failures, used to track or to check failures. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Copy)] +pub struct Failures +{ + /// Number of failures happened back-to-back. + pub seq: usize, + /// Total number of failures over the socket's lifetime. + /// + /// Set allowed to `0` for unlimited. + pub total: usize, + + /// Window of time to keep failures. + pub window: Duration, + /// Number of failures happened in the last `window` of time. + pub last_window: usize, +} + +/// When a failure cap is exceeded, which one is exceeded; and what is the limit that is exceeded? +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum FailureCapExceeded +{ + /// Too many sequential errors + Sequential(usize), + /// Too many total errors + Total(usize), + /// Too many errors in the refresh window + Windowed(usize, Duration), +} + +impl error::Error for FailureCapExceeded{} +impl fmt::Display for FailureCapExceeded +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result + { + match self { + Self::Sequential(_) => write!(f, "too many sequential errors"), + Self::Total(_) => write!(f, "too many total errors"), + Self::Windowed(_, _) => write!(f, "too many errors in refresh window"), + } + } +} + +impl FailureCapExceeded +{ + /// Convert into a detailed report. (This shouldn't be shared with peer, probably.) + pub fn into_detailed_report(self) -> eyre::Report + { + let rep = eyre::Report::from(&self); + match self { + + Self::Sequential(cap) => rep.with_section(|| cap.header("Exceeded limit")), + Self::Total(cap) => rep.with_section(|| cap.header("Exceeded limit")), + Self::Windowed(cap, w) => rep.with_section(|| cap.header("Exceeded limit")) + .with_section(|| format!("{:?}", w).header("Refresh window was")) + } + } +} + +impl Failures +{ + /// Default allowed failure parameters. + pub const DEFAULT_ALLOWED: Self = Self { + seq: 10, + total: 65536, + window: Duration::from_secs(10), + last_window: 5, + }; + + /// Has this `Failures` exceeded failure cap `other`? + pub fn cap_check(&self, other: &Self) -> Result<(), FailureCapExceeded> + { + macro_rules! chk { + ($name:ident, $err:expr) => { + if other.$name != 0 && (self.$name >= other.$name) { + return Err($err)?; + } + } + } + chk!(seq, FailureCapExceeded::Sequential(other.seq)); + chk!(total, FailureCapExceeded::Total(other.total)); + //debug_assert!(other.window == self.window); //TODO: Should we disallow this? + chk!(last_window, FailureCapExceeded::Windowed(other.last_window, self.window)); + + Ok(()) + } +} + +impl Default for Failures +{ + #[inline] + fn default() -> Self + { + Self::DEFAULT_ALLOWED + } +} + diff --git a/src/main.rs b/src/main.rs index fe60f0a..3700c1b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,8 @@ mod ext; use ext::*; mod message; mod cancel; + +mod cap; mod sock; //mod pipeline; diff --git a/src/sock.rs b/src/sock.rs index 4c51fd9..9296642 100644 --- a/src/sock.rs +++ b/src/sock.rs @@ -1,6 +1,9 @@ //! Socket handlers use super::*; +use std::net::{ + SocketAddr, +}; use tokio::io::{ AsyncWrite, AsyncRead @@ -10,16 +13,40 @@ use futures::Future; use cancel::*; +/// Details of a newly accepted raw socket peer. +/// +/// This connected will have been "accepted", but not yet trusted +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RawSockPeerAccepted +{ + /// Address of socket. + pub addr: SocketAddr, + /// Trust this peer from the start? This should almost always be false. + pub auto_trust: bool, +} + +/// Details of a connected, set-up raw socket connection. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RawSockPeerTrusted +{ + /// The socket's details + pub sock_details: RawSockPeerAccepted, + + /// + pub cap_allow: cap::RawSockCapabilities, +} -/// Handles a raw, opened socket -pub fn handle_socket_with_shutdown(tx: W, rx: R, shutdown: C) -> JoinHandle> +/// Handles a **newly connected** raw socket. +/// +/// This will handle setting up socket peer encryption and validation. +pub fn handle_new_socket_with_shutdown(sock_details: RawSockPeerAccepted, tx: W, rx: R, shutdown: C) -> JoinHandle> where R: AsyncRead + Unpin + Send + 'static, W: AsyncWrite + Unpin + Send + 'static { tokio::spawn(async move { match { with_cancel!(async move { - //TODO: How to handle reads+writes? + Ok(()) }, shutdown) } {