//! Saving and loading chain use super::*; use std::{ sync::Arc, path::{ Path, }, io, }; use tokio::{ time::{ self, Duration, }, fs::{ OpenOptions, }, prelude::*, }; use futures::{ future::{ OptionFuture, }, }; #[cfg(feature="compress-chain")] use async_compression::{ tokio_02::{ write::{ BzEncoder, BzDecoder, }, }, }; const SAVE_INTERVAL: Option = Some(Duration::from_secs(2)); #[cfg(feature="compress-chain")] type Compressor = BzEncoder; #[cfg(feature="compress-chain")] type Decompressor = BzDecoder; pub async fn save_now(state: &State) -> io::Result<()> { let chain = state.chain_ref().read().await; use std::ops::Deref; let to = &state.config().file; save_now_to(chain.deref(),to).await } async fn save_now_to(chain: &Chain, to: impl AsRef) -> io::Result<()> { debug!("Saving chain to {:?}", to.as_ref()); let mut file = OpenOptions::new() .write(true) .create(true) .truncate(true) .open(to).await?; let chain = serde_cbor::to_vec(chain).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; { #[cfg(feature="compress-chain")] let mut file = Compressor::new(&mut file); file.write_all(&chain[..]).await?; #[cfg(feature="compress-chain")] file.flush().await?; #[cfg(feature="compress-chain")] file.shutdown().await?; } file.flush().await?; file.shutdown().await?; Ok(()) } /// Start the save loop for this chain pub async fn host(mut state: Box) { let to = state.config().file.to_owned(); let interval = state.config().save_interval(); let when = Arc::clone(state.when_ref()); trace!("Setup oke. Waiting on init"); if state.on_init().await.is_ok() { debug!("Begin save handler"); while Arc::strong_count(&when) > 1 { { let chain = state.chain_ref().read().await; use std::ops::Deref; if let Err(e) = save_now_to(chain.deref(), &to).await { error!("Failed to save chain: {}", e); } else { info!("Saved chain to {:?}", to); } } tokio::select!{ _ = OptionFuture::from(interval.map(|interval| time::delay_for(interval))) => {}, _ = state.on_shutdown() => { break; } } when.notified().await; if state.has_shutdown() { break; } } } else { debug!("Shutdown called before init"); } trace!("Saver exiting"); } /// Try to load a chain from this path pub async fn load(from: impl AsRef) -> io::Result> { debug!("Loading chain from {:?}", from.as_ref()); #[allow(unused_mut)] let mut file = OpenOptions::new() .read(true) .open(from).await?; #[allow(unused_mut)] let mut whole = Vec::new(); #[cfg(feature="compress-chain")] let mut whole = Decompressor::new(whole); tokio::io::copy(&mut file, &mut whole).await?; whole.flush().await?; #[cfg(feature="compress-chain")] whole.shutdown().await?; #[cfg(feature="compress-chain")] let whole = whole.into_inner(); serde_cbor::from_slice(&whole[..]) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e)) }