From 5fe40069f118029cae9dc7b41cd03856b46fc0f3 Mon Sep 17 00:00:00 2001 From: Avril Date: Tue, 15 Sep 2020 07:56:32 +0100 Subject: [PATCH] added worker host --- Cargo.lock | 185 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/cache.rs | 9 +- src/main.rs | 12 ++- src/state/local/host.rs | 79 +++++++++++++++++ src/state/local/mod.rs | 12 +-- src/state/local/work.rs | 26 ------ src/suspend.rs | 10 +++ 8 files changed, 290 insertions(+), 44 deletions(-) create mode 100644 src/state/local/host.rs diff --git a/Cargo.lock b/Cargo.lock index e72f63c..1517bf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,29 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "addr2line" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6a2d3371669ab3ca9797670853d61402b03d0b4b9ebf33d677dfa720203072" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "arc-swap" version = "0.4.7" @@ -23,6 +47,20 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46254cf2fdcdf1badb5934448c1bcbe046a56537b3987d96c51a7afc5d03f293" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.12.3" @@ -86,6 +124,32 @@ dependencies = [ "time", ] +[[package]] +name = "color-eyre" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac5c105065fcb0c002304ae32147ba52df86e07cea7de11c88c62d24972d683" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a99aa4aa18448eef4c7d3f86d2720d2d8cad5c860fe9ff9b279293efdc8f5be" +dependencies = [ + "ansi_term", + "tracing-core", + "tracing-error", +] + [[package]] name = "cpuid-bool" version = "0.1.2" @@ -145,6 +209,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "eyre" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f9683839e579a53258d377fcc0073ca0bf2042ac5e6c60a598069e64403a6d" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fnv" version = "1.0.7" @@ -298,6 +372,12 @@ dependencies = [ "wasi 0.9.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724" + [[package]] name = "half" version = "1.6.0" @@ -329,6 +409,12 @@ dependencies = [ "digest", ] +[[package]] +name = "indenter" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0bd112d44d9d870a6819eb505d04dd92b5e4d94bb8c304924a0872ae7016fb5" + [[package]] name = "iovec" version = "0.1.4" @@ -375,6 +461,16 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "miniz_oxide" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c60c0dfe32c10b43a144bad8fc83538c52f58302c92300ea7ec7bf7b38d5a7b9" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.6.22" @@ -479,6 +575,12 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5" + [[package]] name = "once_cell" version = "1.4.1" @@ -518,6 +620,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owo-colors" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a1250cdd103eef6bd542b5ae82989f931fc00a41a27f60377338241594410f3" + [[package]] name = "pbkdf2" version = "0.5.0" @@ -654,6 +762,12 @@ version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +[[package]] +name = "rustc-demangle" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" + [[package]] name = "rustc_version" version = "0.2.3" @@ -721,6 +835,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.2.1" @@ -766,6 +889,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "time" version = "0.1.44" @@ -812,6 +944,58 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" +dependencies = [ + "cfg-if", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bcf46c1f1f06aeea2d6b81f3c863d0930a596c86ad1920d4e5bad6dd1d7119a" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tracing-error" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4d7c0b83d4a500748fa5879461652b361edf5c9d51ede2a2ac03875ca185e24" +dependencies = [ + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82bb5079aa76438620837198db8a5c529fb9878c730bc2b28179b0241cf04c10" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", +] + [[package]] name = "typenum" version = "1.12.0" @@ -909,6 +1093,7 @@ dependencies = [ "async-trait", "byteorder", "chrono", + "color-eyre", "cryptohelpers", "difference", "futures", diff --git a/Cargo.toml b/Cargo.toml index eaf837c..e1a99cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ serde = {version = "1.0", features= ["derive"]} futures = "0.3" pin-project = "0.4.23" difference = "2.0.0" +color-eyre = "0.5.2" [build-dependencies] rustc_version = "0.2" diff --git a/src/cache.rs b/src/cache.rs index 2d31a78..d7671ea 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,8 +1,3 @@ //! Caching interfaces -use super::*; -use std::{ - borrow::{ - Borrow, - ToOwned, - }, -}; + +//TODO diff --git a/src/main.rs b/src/main.rs index 7d62d95..e65679d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,11 @@ use async_trait::async_trait; use serde::{ Serialize, Deserialize, }; +use color_eyre::{ + eyre, + Help, + SectionExt, +}; mod ext; use ext::*; @@ -25,8 +30,8 @@ mod identity; mod post; mod state; -fn main() { - +fn main() -> Result<(), eyre::Report>{ + color_eyre::install()?; /* let mut vec = vec![vec![1, 0, 0], vec![0, 0, 1]]; @@ -36,5 +41,6 @@ fn main() { for _ in 0..10000 { vec.insert_exact(1, span.iter().cloned()); - }*/ +}*/ + Ok(()) } diff --git a/src/state/local/host.rs b/src/state/local/host.rs new file mode 100644 index 0000000..8b1db46 --- /dev/null +++ b/src/state/local/host.rs @@ -0,0 +1,79 @@ +//! Hosts `Imouto`'s `Kokoro`. + +use super::*; +use std::{ + pin::Pin, + task::{ + Poll, + Context, + }, + mem, +}; +use pin_project::{ + pin_project, +}; +use futures::{ + future::Future, +}; +use tokio::{ + task, + sync::{ + oneshot, + mpsc, + }, +}; + +#[derive(Debug)] +pub enum Worker +{ + Attached(task::JoinHandle>), + /// Worker has not been attached yet + Suspended(Kokoro), + /// Worker filed to spawn, + Crashed, +} + +/// Host this `Kokoro` +async fn work(mut kokoro: Kokoro, mut recv: mpsc::Receiver>) -> Result<(), eyre::Report> +{ + while let Some(new) = recv.recv().await { + kokoro.apply(new).await?; + } + Ok(()) +} + +impl Worker +{ + /// Host this suspended state worker with a channel to receive updates from. + /// + /// # Panics + /// If we are not in the `Suspended` state. + pub fn host(&mut self, recv: mpsc::Receiver>) -> WorkerHandle + { + match mem::replace(self, Self::Crashed) { + Self::Suspended(kokoro) => { + let (tx, rx) = oneshot::channel(); + *self = Self::Attached(task::spawn(async move { + let vl = work(kokoro, recv).await; + tx.send(()).unwrap(); + vl + })); + WorkerHandle(rx) + }, + _ => panic!("Attempted to host non-suspended worker"), + } + } +} + +/// A handle on a spawned worker +#[pin_project] +pub struct WorkerHandle(#[pin] oneshot::Receiver<()>); + +impl Future for WorkerHandle +{ + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.0.poll(cx).map(|x| x.unwrap()) + } +} diff --git a/src/state/local/mod.rs b/src/state/local/mod.rs index 822ad48..c99e27a 100644 --- a/src/state/local/mod.rs +++ b/src/state/local/mod.rs @@ -19,18 +19,13 @@ pub use delta::*; mod work; pub use work::*; +mod host; +pub use host::*; + mod error; pub use error::Error as LocalError; -#[derive(Debug)] -enum Worker -{ - Attached(tokio::task::JoinHandle<()>), - /// Worker has not been attached yet - Suspended(Kokoro), -} - /// An open, as yet unfinied post #[derive(Debug)] pub struct Imouto @@ -64,6 +59,7 @@ impl Suspendable for Imouto output.insert("body", match self.worker { Worker::Suspended(kokoro) => kokoro.into_suspended().await, // consume worker if possible Worker::Attached(_) => self.karada.into_suspended().await, // consume body instead + _ => return Err(suspend::Error::Other(eyre::eyre!("Tried to suspend crashed worker"))), }); //output.insert("hash", self.hash); output.insert("timestamp", self.timestamp); diff --git a/src/state/local/work.rs b/src/state/local/work.rs index 591cdb5..82c2a3d 100644 --- a/src/state/local/work.rs +++ b/src/state/local/work.rs @@ -1,37 +1,11 @@ //! Worker that mutates `Kokoro`. use super::*; -use std::{ - pin::Pin, - task::{ - Poll, - Context, - }, -}; use tokio::{ sync::{ watch, oneshot, }, }; -use pin_project::{ - pin_project, -}; -use futures::{ - future::Future, -}; - -/// A handle on a spawned worker -#[pin_project] -pub struct WorkerHandle(#[pin] oneshot::Receiver<()>); - -impl Future for WorkerHandle -{ - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.0.poll(cx).map(|x| x.unwrap()) - } -} /// Handles working on `Karada` instances. #[derive(Debug)] diff --git a/src/suspend.rs b/src/suspend.rs index c297d5c..2bd7df2 100644 --- a/src/suspend.rs +++ b/src/suspend.rs @@ -328,6 +328,7 @@ pub enum Error { Corruption, IO(io::Error), + Other(eyre::Report), Unknown, } impl error::Error for Error @@ -348,6 +349,7 @@ impl fmt::Display for Error Self::MissingObject(string) => write!(f, "missing object from stream: {}", string), Self::Corruption => write!(f, "data stream corruption"), Self::IO(_) => write!(f, "i/o error"), + Self::Other(report) => write!(f, "internal: {}", report), _ => write!(f, "unknown error"), } } @@ -361,6 +363,14 @@ impl From for Error } } +impl From for Error +{ + fn from(from: eyre::Report) -> Self + { + Self::Other(from) + } +} + /// A suspendable type, that can save and reload its data atomically #[async_trait]