Compare commits

...

14 Commits
master ... new

271
Cargo.lock generated

@ -1,10 +1,12 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ad-hoc-iter"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5068a429476033d1940f21e21d317afae2fc3a82f412d5d8fe08c13f100a00e8"
checksum = "90a8dd76beceb5313687262230fcbaaf8d4e25c37541350cf0932e9adb8309c8"
[[package]]
name = "autocfg"
@ -18,6 +20,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
version = "1.2.1"
@ -47,9 +55,9 @@ checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "0.6.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
@ -69,6 +77,36 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20stream"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a91f983a237d46407e744f0b9c5d2866f018954de5879905d7af6bf06953aea"
dependencies = [
"base64 0.13.0",
"getrandom 0.2.2",
"openssl",
"pin-project",
"rustc_version",
"serde",
"smallvec",
"tokio",
]
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"winapi 0.3.9",
]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
@ -84,6 +122,17 @@ dependencies = [
"build_const",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278"
dependencies = [
"autocfg",
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
name = "crypto-mac"
version = "0.9.1"
@ -96,13 +145,13 @@ dependencies = [
[[package]]
name = "cryptohelpers"
version = "1.7.0"
version = "1.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1758ba574c79ae6db3ccf6623cacc5293c2c0a14de871a7b95d4286861cbd504"
checksum = "488ef2d3d94ab2ec791e4c1decf766b4b9beac2d49b41d184db937adc8d54a91"
dependencies = [
"crc",
"futures",
"getrandom",
"getrandom 0.1.15",
"hex-literal",
"hmac",
"libc",
@ -288,9 +337,26 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
dependencies = [
"cfg-if 0.1.10",
"libc",
"wasi",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
name = "half"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3"
[[package]]
name = "hermit-abi"
version = "0.1.17"
@ -469,17 +535,26 @@ version = "0.1.0"
dependencies = [
"ad-hoc-iter",
"bitflags",
"bytes 0.6.0",
"bytes 1.0.1",
"chacha20stream",
"chrono",
"crossbeam-utils",
"cryptohelpers",
"futures",
"generational-arena",
"jemallocator",
"lazy_static",
"memmap",
"pin-project",
"rand 0.8.3",
"rustc_version",
"serde",
"serde_cbor",
"serde_json",
"smallvec",
"stackalloc",
"tokio",
"uuid",
]
[[package]]
@ -493,6 +568,25 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
@ -517,23 +611,23 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.31"
version = "0.10.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187"
checksum = "6d7830286ad6a3973c0f1d9b73738f69c76b739301d0229c4b96501695cbe4c8"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types",
"lazy_static",
"libc",
"once_cell",
"openssl-sys",
]
[[package]]
name = "openssl-sys"
version = "0.9.59"
version = "0.9.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe"
checksum = "b6b0d6fb7d80f877617dfcb014e605e2b5ab2fb0afdf27935219bb6bd984cb98"
dependencies = [
"autocfg",
"cc",
@ -548,29 +642,29 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170d73bf11f39b4ce1809aabc95bf5c33564cdc16fc3200ddda17a5f6e5e48b"
dependencies = [
"base64",
"base64 0.12.3",
"crypto-mac",
"hmac",
"rand",
"rand_core",
"rand 0.7.3",
"rand_core 0.5.1",
"sha2",
"subtle",
]
[[package]]
name = "pin-project"
version = "1.0.2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2",
"quote",
@ -615,9 +709,9 @@ checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]]
name = "proc-macro2"
version = "1.0.24"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
checksum = "a152013215dca273577e18d2bf00fa862b89b24169fb78c4c95aeb07992c9cec"
dependencies = [
"unicode-xid",
]
@ -637,9 +731,21 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"rand_chacha",
"rand_core",
"rand_hc",
"rand_chacha 0.2.2",
"rand_core 0.5.1",
"rand_hc 0.2.0",
]
[[package]]
name = "rand"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha 0.3.0",
"rand_core 0.6.2",
"rand_hc 0.3.0",
]
[[package]]
@ -649,7 +755,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.5.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core 0.6.2",
]
[[package]]
@ -658,7 +774,16 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
"getrandom 0.1.15",
]
[[package]]
name = "rand_core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom 0.2.2",
]
[[package]]
@ -667,7 +792,25 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
"rand_core 0.5.1",
]
[[package]]
name = "rand_hc"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core 0.6.2",
]
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]]
@ -676,6 +819,21 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.118"
@ -685,6 +843,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.118"
@ -737,9 +905,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.5.1"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
@ -752,6 +920,16 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "stackalloc"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c790f3002980a878515d27f0f1b4d083f67f9103e9316205088cc4728277de51"
dependencies = [
"cc",
"rustc_version",
]
[[package]]
name = "subtle"
version = "2.4.0"
@ -760,15 +938,26 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "syn"
version = "1.0.54"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2af957a63d6bd42255c359c93d9bfdb97076bd3b820897ce55ffbfbf107f44"
checksum = "a1e8cdbefb79a9a5a65e0db8b47b723ee907b7c7f8496c76a1770b5c310bab82"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "tokio"
version = "0.2.24"
@ -816,6 +1005,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom 0.2.2",
"serde",
]
[[package]]
name = "vcpkg"
version = "0.2.11"
@ -834,6 +1033,12 @@ version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "winapi"
version = "0.2.8"

@ -10,14 +10,25 @@ edition = "2018"
[dependencies]
ad-hoc-iter = "0.2.2"
bitflags = "1.2.1"
bytes = "0.6.0"
bytes = "1.0.1"
chacha20stream = {version = "1.0", features = ["async", "serde"]}
chrono = {version = "0.4.19", features= ["serde"]}
crossbeam-utils = "0.8.4"
cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]}
futures = "0.3.8"
generational-arena = "0.2.8"
jemallocator = "0.3.2"
lazy_static = "1.4.0"
memmap = "0.7.0"
pin-project = "1.0.2"
pin-project = "1.0.7"
rand = "0.8.3"
serde = {version = "1.0.118", features= ["derive"]}
serde_cbor = "0.11.1"
serde_json = "1.0.60"
smallvec = "1.5.1"
stackalloc = "1.1.0"
tokio = {version = "0.2", features = ["full"]}
uuid = {version = "0.8", features = ["v4", "serde"]}
[build-dependencies]
rustc_version = "0.2"

@ -1,554 +0,0 @@
use super::*;
use std::marker::Unpin;
use std::{
io::{
self,
Read,
},
fs,
fmt,
};
use bytes::{
BytesMut,
BufMut,
};
use tokio::io::AsyncRead;
use tokio::{
task,
sync::{
mpsc,
oneshot,
},
};
use futures::prelude::*;
/// An open fd that has been memory mapped.
#[derive(Debug)]
pub struct OpenMMap
{
file: File,//note: this layout matters for destruction ordering.
map: Mmap,
}
impl AsRef<[u8]> for OpenMMap
{
#[inline(always)] fn as_ref(&self) -> &[u8]
{
&self.map[..]
}
}
impl OpenMMap
{
async fn new_file(file: tokio::fs::File) -> io::Result<Self>
{
let file = file.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_file_sync(file: File) -> io::Result<Self>
{
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_sync(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = fs::OpenOptions::new().read(true).open(file)?;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
async fn new(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
}
/// How aggressively should we cache a specific item.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
#[repr(u32)]
pub enum Level
{
/// No cacheing
///
/// # Usage
/// Best used for cold files, or files that are not accessed much, or when running low on memory and/or fds.
///
/// Corresponds to `DataCacheState::None`
None,
/// Open the file and cache the FD as std `File`
/// This can avoid the syscall overhead of needing to open the file next time it is accessed.
///
/// # Usage
/// Best used for frequently acessed files.
///
/// Corresponds to `DataCacheState::Open`
Low,
/// Open the file, cache the FD *and* map the file in memory.
/// This can provide efficient random-access of the file without the need to preload it.
///
/// # Usage
/// Best used for frequently read large files.
///
/// Corresponds to `DataCacheState::Mapped`
High,
/// Load the whole contents of the file into memory, without keeping it open.
/// This provides the most efficient acesss of the file, as well as not contributing to the process' fd limit, but at the cost of:
/// * Loading the whole file into a memory buffer, which may be a slow operation and/or take up massive memory space
/// * Allowing a potential desync if the file is changed on disk while the cache buffer is loaded.
///
/// ## Desync
/// While `mtfse` already assumes it has exclusive access to all files in its db root, generally a file being modified by an external program will cause an error to be returned within a `mtfse` operation eventually as file hashes are updated and/or files are encrypted/decrypted or even read (in the case of a file being deleted.)
///
/// When an item is cached at this level however, any of these kinds of changes made will not be visible. The store can then be put into an invalid state without realising it.
///
/// ## Tradeoffs
/// The caching operation itself is expensive, the memory requirements is expensive, and in some cases this can even *slow* reads compared to the other levels when used on large files, as we cannot take advantage of the kernel's internal file data caching for mapped random-acesss reads and we are bound to causing CPU cache misses.
///
/// # Usage
/// Best used for frequently read small files.
///
/// Corresponds to `DataCacheState::Memory`
Extreme,
}
/// Provides immutable caching of a file in a data entry.
#[derive(Debug)]
pub enum DataCacheState
{
/// There is no file cache for this item.
None,
/// The file is open, we have an fd.
Open(File),
/// The file is open and memory mapped.
Mapped(OpenMMap),
/// The file is not open, but its whole contents have been loaded into memory.
Memory(Bytes), // load from file via `BytesMut` buffer, then call `.freeze()`.
}
impl Default for DataCacheState
{
#[inline]
fn default() -> Self
{
Self::None
}
}
impl DataCacheState
{
/// Spawn a task to generate a cache for the file provided by `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background(from: impl Into<PathBuf>, level: Level) -> impl Future<Output = io::Result<Self>> + 'static//tokio::task::JoinHandle<io::Result<Self>>
{
let from = from.into();
async move {
if from.exists() && from.is_file() {
tokio::spawn(Self::new(from, level)).await.expect("Background loader panicked")
} else {
Err(io_err!(NotFound, "Path either not existant or not a file."))
}
}
}
/// Spawn a task to generate a cache for the already opened file `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background_from_file(from: tokio::fs::File, level: Level) -> impl Future<Output = io::Result<Self>> + 'static // tokio::task::JoinHandle<io::Result<Self>>
{
async move {
let file = from.into_std().await;
tokio::spawn(Self::new_fd(file, level)).await.expect("Background loader panicked")
}
}
}
impl DataCacheState
{
/// Attempt to upgrade the cache.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub async fn into_upgrade(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => x,
})
}
/// Attempt to upgrade the cache, blocking the current thread.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub fn into_upgrade_sync(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => x,
})
}
/// Attempt to upgrade the cache in-place.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub async fn upgrade(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Attempt to upgrade the cache in-place, blocking the current thread.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub fn upgrade_sync(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Read from the cache at `offset` into the provided buffer, and return the number of bytes read.
///
/// # Performance
///
/// When the cache has random access, this method completes without yielding. If not, it performs async seek & read operations to fill the buffer as much as possible from the offset.
///
/// # Returns
/// If `EOF` is encountered within the read, then it is terminated early and the number of bytes successfully read is returned (and will be less than the length of the buffer), otherwise, the full buffer was filled, and the full buffer's length will be returned.
///
/// ## Errors
///
/// If this cache is not active, it will return an `io::Error` with `io::ErrorKind::NotConnected`.
/// Any other error in I/O operations is propagated.
pub async fn read_at(&mut self, offset: usize, into: &mut [u8]) -> io::Result<usize> // this takes `&mut self` only to ensure it cannot be called on different threads at the same time, as any file operations need to be atomic.
{
if let Some(ar) = self.random_access()
{
return Ok(slice::copy_bytes(&ar[offset..], into));
}
if let Some(file) = self.file()
{
use tokio::{
fs, prelude::*,
};
let mut file = fs::File::from_std(file.try_clone()?); // this is what requires we take `&mut(ex) self`.
file.seek(io::SeekFrom::Start(u64::try_from(offset).expect("Arch size integer was out of bounds of u64 (this should never happen)"))).await?;
let mut read =0;
let mut cur;
while {cur =file.read(&mut into[read..]).await?; cur != 0 && read<into.len()} {
read+=cur;
}
return Ok(read);
}
Err(io_err!(NotConnected, "Operation not supported (no cache is available)"))
}
/// Attempt to get a reference to an fd
pub fn file(&self) -> Option<&File>
{
match self
{
Self::Mapped(map) => Some(&map.file),
Self::Open(file) => Some(file),
_ => None,
}
}
/// Attempt to get a random access buffer of this cache, if one exists.
pub fn random_access(&self) -> Option<& [u8]>
{
match self {
Self::Mapped(map) => Some(map.as_ref()),
Self::Memory(mem) => Some(&mem[..]),
_ => None,
}
}
/// Drop the whole cache (if there is one).
#[inline(never)] pub fn clear(&mut self)
{
*self = Self::None;
}
/// Attempt to asynchronously create a cache state for file provided by already loaded `file` at this level.
pub async fn new_fd(file: File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Level::Extreme => {
let file = tokio::fs::File::from(file);
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to asynchronously create a cache state for file provided by `file` at this level.
pub async fn new(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await),
Level::High => Self::Mapped(OpenMMap::new(file).await?),
Level::Extreme => {
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_sync(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(fs::OpenOptions::new().read(true).open(file)?),
Level::High => Self::Mapped(OpenMMap::new_sync(file)?),
Level::Extreme => {
let file = fs::OpenOptions::new().read(true).open(file)?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by already-loaded `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_fd_sync(file: fs::File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file_sync(file)?),
Level::Extreme => {
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
}
const BUFSIZE: usize = 4096;
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Does not block the current task.
async fn read_whole_into_buffer<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: AsyncRead + Unpin,
W: BufMut + ?Sized,
{
use tokio::prelude::*;
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..]).await? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Blocks the current thread.
fn read_whole_into_buffer_sync<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: Read,
W: BufMut + ?Sized,
{
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..])? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// A request to send to a background worker spawned by a `service` function, for each instance of this there will be a corresponding `CacheResponse` you can `await` to receive the value.
#[derive(Debug)]
pub struct CacheRequest(tokio::fs::File, Level, oneshot::Sender<io::Result<DataCacheState>>);
/// A handle to a pending `DataCacheState` being constructed from a `CacheRequest` sent to a background worker from a `service` function.
///
/// `await`ing this response will yield until the worker task has completed and can send the cache state back, upon which it yields a `io::Result<DataCacheState>`.
#[derive(Debug)]
pub struct CacheResponse(oneshot::Receiver<io::Result<DataCacheState>>);
//TODO: impl Future for CacheResponse ...
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
#[inline] pub fn service() -> (impl Future<Output = ()> + 'static, mpsc::Sender<CacheRequest>)
{
use std::{
task::{Poll, Context},
pin::Pin,
};
/// A future that never completes and never reschedules itself.
struct ShimNever;
impl Future for ShimNever
{
type Output = !;
#[inline(always)] fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output>
{
Poll::Pending
}
}
let (rt, tx) = service_with_shutdown(ShimNever);
(rt.map(|_| ()), tx)
}
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service can take a `Future` which, when completes, will drop the service task and stop replying to responses.
/// The service also shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
/// The output of the future is how the service was terminated, by the `cancel` future completing, or by all producers being dropped.
pub fn service_with_shutdown(cancel: impl Future + 'static) -> (impl Future<Output = ServiceResult> + 'static, mpsc::Sender<CacheRequest>)
{
let (tx, mut rx) = mpsc::channel(32);
(async move {
let ren = async {
while let Some(CacheRequest(file, level, send_to)) = rx.recv().await {
tokio::spawn(async move {
let _ = send_to.send(DataCacheState::new_fd(file.into_std().await, level).await);
});
}
};
tokio::select! {
_ = ren => {
// all senders dropped
ServiceResult::NoProducers
}
_ = cancel => {
// explicit cancel
rx.close();
ServiceResult::Cancelled
}
}
}, tx)
}
/// The reason a `service` task has terminated.
#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[non_exhaustive]
pub enum ServiceResult
{
/// All mpsc senders had been dropped, there were no more possible requests to respond to.
NoProducers,
/// The service was explicitly cancelled by a shutdown future.
Cancelled,
}
impl fmt::Display for ServiceResult
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::NoProducers => write!(f, "All mpsc senders had been dropped, there were no more possible requests to respond to."),
Self::Cancelled => write!(f, "The service was explicitly cancelled by a shutdown future."),
}
}
}

@ -1,50 +0,0 @@
use super::*;
bitflags!{
/// What to remove when using `Store::clean()`.
pub struct StoreCleanFlags: u32
{
/// Dead entry hash (key) mappings.
const MAP = 1 << 0;
/// Dead `tag -> entry hash` mappings.
const TAG_MAP = 1 << 1;
/// Dead tags (i.e. tags with no entries).
const TAGS = 1 << 2;
/// Dead entries (i.e. Inaccessable, corrupt, or missing file entries).
const ENTRY = 1 << 3;
/// Clean all
const ALL = (1 << 4)-1;
}
}
impl Default for StoreCleanFlags
{
#[inline]
fn default() -> Self
{
Self::MAP | Self::TAG_MAP | Self::TAGS
}
}
// Refresh and cleaning
impl Store
{
/// Remove any and all dead mappings / entries specified by the flags here.
///
/// Pass `Default::default()` to only clean *mappings*, and not dead entries. Otherwise, `StoreCleanFlags::ALL` will perform a full audit.
/// See [`StoreCleanFlags`] for other options.
pub fn clean(&mut self, what: StoreCleanFlags)
{
todo!("What order do we do `what`'s things in?")
}
/// Force a full rebuild of all mappings.
///
/// The clears the entire store except entries (and entry specific caches), and then rebuilds all the internal mappings from scratch. Any cached mappings (e.g empty tag reserves) are removed.
/// This does not remove invalid entries themselves, for that use `clean(StoreCleanFlags::ENTRY)`.
pub fn rebuild(&mut self)
{
todo!()
}
}

@ -1,139 +0,0 @@
use super::*;
use std::hash::{Hash, Hasher};
use std::borrow::Borrow;
pub mod builder;
#[derive(Debug, Serialize, Deserialize)] // Clone, PartialEq, Eq, Hash
pub struct Entry
{
name: String,
description: String,
/// The key used to encrypt the file, if any.
key: Option<aes::AesKey>,
/// The hash of the *unencrypted* data.
hash: sha256::Sha256Hash,
/// The tags for this entry.
///
/// # Note
/// This should be updated the same as the `Store`'s `tags` and `tag_mappings`. The duplication here is for lookup convenience / caching.
pub(super) tags: Vec<String>,
/// The *original* filename of the item, *not* the path to the real file for this item (see `location`).
filename: OsString,
/// Filename *relative* to the root of the store
location: PathBuf,
/// The state of caching this entry is currently in. This should not be saved as it has no meaning regarding the actual data itself.
#[serde(skip)]
cache: DataCacheState,
}
// Accessors
impl Entry
{
/// The name of this entry
pub fn name(&self) -> &str
{
&self.name
}
pub fn description(&self) -> &str
{
&self.description
}
/// The *original* filename of this entry
pub fn filename(&self) -> &Path
{
Path::new(&self.filename)
}
/// The path of this entry's file relative to the root of its store
pub fn location(&self) -> &Path
{
self.location.as_path()
}
/// The tags for this entry
pub fn tags(&self) -> &[String]
{
&self.tags[..]
}
/// Is the file of this entry encrypted?
pub fn is_encrypted(&self) -> bool
{
self.key.is_some()
}
/// The sha256 hash of the data in this entry
pub fn hash(&self) -> &sha256::Sha256Hash
{
&self.hash
}
}
impl Entry
{
/// Consume and drop the cache. Only that useful as shim for mapping.
#[inline(always)] pub(super) fn with_no_cache(self) -> Self
{
Self {
cache: Default::default(),
..self
}
}
/// Clone the data needed for a refresh of this entry in a store before it is mutated.
#[inline(always)] pub(super) fn prepare_for_refresh(&self) -> (EntryKey, Box<[String]>)
{
(self.hash().clone(), self.tags.clone().into_boxed_slice())
}
}
impl Borrow<sha256::Sha256Hash> for Entry
{
#[inline] fn borrow(&self) -> &sha256::Sha256Hash
{
&self.hash
}
}
impl Hash for Entry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.hash.hash(state)
}
}
impl PartialEq for Entry
{
fn eq(&self, other: &Self) -> bool
{
macro_rules! eq {
($one:ident) => (self.$one == other.$one);
($($many:ident),*) => {
$(
eq!($many)
)&& *
};
}
eq!(name, description, key, tags, filename, location)
}
}
impl Eq for Entry{}
impl Clone for Entry
{
fn clone(&self) -> Self {
macro_rules! clone {
($($many:ident),*) => {
Self {
cache: Default::default(),
$($many: self.$many.clone()),*
}
}
}
clone!(name, description, key, tags, filename, location, hash)
}
}

@ -1,107 +0,0 @@
use super::*;
use std::{
error,
fmt,
};
use std::ffi::OsStr;
/// Builder for creating [`Entry`] instances.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct EntryBuilder<'a>
{
name: Option<&'a str>,
desc: Option<&'a str>,
tags: Option<&'a[&'a str]>,
key: Option<aes::AesKey>,
hash: Option<sha256::Sha256Hash>,
location: Option<&'a Path>,
filename: Option<&'a OsStr>,
}
macro_rules! build_fn {
($name:ident: $ty:ty $(, $com:literal)?) => {
$(#[doc=$com])?
#[inline] pub const fn $name(self, $name: $ty) -> Self
{
Self {
$name: Some($name),
..self
}
}
};
}
impl<'a> EntryBuilder<'a>
{
/// Create a new entry builder
#[inline] pub const fn new() -> Self
{
macro_rules! default {
($($name:ident),*) => {
Self {
$(
$name: None
),*
}
}
}
default!{
name, desc, tags, key, hash, location, filename
}
}
build_fn!(name: &'a str, "Insert the name of this entry");
build_fn!(desc: &'a str, "Insert a description for this entry");
build_fn!(tags: &'a [&'a str], "Insert the tags for this entry");
build_fn!(key: aes::AesKey, "Insert the encryption key for this entry");
build_fn!(hash: sha256::Sha256Hash, "Insert the hash for this entry");
build_fn!(location: &'a Path, "Insert the location for this entry");
build_fn!(filename: &'a OsStr, "Insert the original filename for this entry");
/// Try to build an `Entry` from this builder.
#[inline] pub fn build(self) -> Result<Entry, EntryBuildError>
{
macro_rules! fail_on {
($name:ident) => {
self.$name.ok_or(EntryBuildError)?
};
($name:ident: $or:expr) =>{
if let Some(name) = self.$name {
name
} else {
$or
}
};
}
Ok(
Entry {
name: fail_on!(name).into(),
description: fail_on!(desc: "").into(),
tags: fail_on!(tags: &[]).iter().map(|&x| x.to_owned()).collect(),
key: self.key,
hash: fail_on!(hash),
location: fail_on!(location).into(),
filename: fail_on!(filename: OsStr::new("")).into(),
cache: Default::default(),
}
)
}
}
/// Error for when an improporly configured [`EntryBuilder`] called `.build()`.
/// Usually this is because of missing fields.
#[derive(Debug)]
pub struct EntryBuildError;
impl error::Error for EntryBuildError{}
impl fmt::Display for EntryBuildError
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "improporly configured builder")
}
}

@ -1,98 +0,0 @@
use super::*;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Freeze
{
metadata: StoreMetadata,
data: Vec<Entry>,
}
impl Freeze
{
pub(super) fn new_ref(from: &Store) -> Self
{
Self {
metadata: from.metadata.clone(),
data: from.data.iter().map(Entry::clone).collect(),
}
}
pub(super) fn new_moved(from: Store) -> Self
{
Self {
metadata: from.metadata,
data: from.data.into_iter().map(Entry::with_no_cache).collect(),
}
}
pub(super) fn create_new(&self) -> Store
{
let mut new = Store::with_capacity(self.metadata.clone(), self.data.len());
for entry in self.data.iter()
{
let hash = entry.hash().clone();
let hash_idx = new.data_hashes.insert(hash);
// insert the tags
for tag in entry.tags.iter()
{
if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti);
}
}
new.data.insert(entry.clone());
}
new
}
pub(super) fn into_new(self) -> Store
{
let Freeze { data, metadata } = self;
let mut new = Store::with_capacity(metadata, data.len());
for entry in data.into_iter()
{
let hash = entry.hash().clone();
let hash_idx = new.data_hashes.insert(hash);
// insert the tags
for tag in entry.tags.iter()
{
if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti);
}
}
new.data.insert(entry);
}
new
}
}
impl From<Store> for Freeze
{
#[inline] fn from(from: Store) -> Self
{
Self::new_moved(from)
}
}
impl From<Freeze> for Store
{
#[inline] fn from(from: Freeze) -> Self
{
from.into_new()
}
}

@ -1,15 +0,0 @@
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StoreMetadata
{
pub name: String,
pub root: PathBuf,
}
impl StoreMetadata
{
}

@ -1,179 +0,0 @@
use super::*;
use generational_arena::{
Arena, Index as ArenaIndex,
};
use std::convert::{TryFrom, TryInto};
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::fs::File;
use std::ffi::OsString;
use memmap::Mmap;
use bytes::Bytes;
use cryptohelpers::{aes, sha256};
mod entry;
pub use entry::Entry;
pub use entry::builder::EntryBuilder;
mod cache;
pub use cache::DataCacheState;
mod metadata;
pub use metadata::StoreMetadata;
mod freeze;
pub use freeze::Freeze;
mod search;
pub use search::*;
mod mutation;
pub use mutation::*;
mod clean;
pub use clean::*;
#[cfg(test)] mod test;
/// The key used to look up a single entry in `O(1)` time.
///
/// # Notes
/// If you change this, make sure to change the `BuildHasher` back to `RandomState`.
pub type EntryKey = sha256::Sha256Hash;
/// The hasher used for the entry data set.
///
/// # Notes
/// Change this back to `RandomState` if you change the type of `EntryKey`.
pub type BuildHasher = std::collections::hash_map::RandomState;//Sha256TopBuildHasher; <-- bug here
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)]
struct PurgeTrack
{
/// Number of removals since last purge
num_removals: usize,
/// Max number of removals before an arena index purge.
///
/// If set to 0, purge will be performed on every removal of a tag.
num_removals_max: usize,
}
impl PurgeTrack
{
/// Create a new purge tracker with this purge threshold.
#[inline] pub fn new(max: usize) -> Self
{
Self {
num_removals: 0,
num_removals_max: max,
}
}
/// Should we purge now?
#[inline] pub fn should_purge(&self) -> bool
{
self.num_removals >= self.num_removals_max
}
}
#[derive(Debug)]
pub struct Store
{
metadata: StoreMetadata,
purge_track: PurgeTrack,
data: HashSet<Entry, BuildHasher>, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`.
data_hashes: Arena<EntryKey>, // used to lookup in `data`.
tag_mappings: Arena<HashSet<ArenaIndex>>,
tags: BTreeMap<String, ArenaIndex>, // string (tags) -> index (tag_mappings) -> index (data_hashes) -> hash used for lookup (data)
}
// Creating
impl Store
{
/// Create a new empty store with this metadata.
///
/// # Panics
/// If the root directory specified in `metadata` does not exist or is not a directory.
pub fn new(metadata: StoreMetadata) -> Self
{
assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `new` not existant or not a directory", metadata.root);
Self {
metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_hasher(Default::default()),
data_hashes: Arena::new(),
tag_mappings: Arena::new(),
tags: BTreeMap::new(),
}
}
/// Create a new empty store with this metadata and initial storage capacity
///
/// # Panics
/// If the root directory specified in `metadata` does not exist or is not a directory.
pub fn with_capacity(metadata: StoreMetadata, cap: usize) -> Self
{
assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `with_capacity` not existant or not a directory", metadata.root);
Self {
metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_capacity_and_hasher(cap, Default::default()),
data_hashes: Arena::with_capacity(cap),
tag_mappings: Arena::with_capacity(cap),
tags: BTreeMap::new(),
}
}
}
// Freezing
impl Store
{
/// Create a snapshot of this store, cloning all data into a frozen and serialisable version of it.
/// # Notes
/// This method clones the entire store into the new `Freeze`. To avoid this, use `into_freeze` if the store is no longer used after the freeze.
#[inline] pub fn freeze(&self) -> Freeze
{
Freeze::new_ref(self)
}
/// Consume into a snapshot of this store, moving all data into a frozen and serializable version of it.
#[inline] pub fn into_freeze(self) -> Freeze
{
Freeze::new_moved(self)
}
/// Create a new store instance by cloning from a frozen snapshot of it.
/// # Notes
/// This method clones the entire `Freeze` into the new store. To avoid this, use `from_freeze` if the snapshot is no longer used after the unfreeze.
#[inline] pub fn unfreeze(freeze: &Freeze) -> Self
{
freeze.create_new()
}
/// Consume a store snapshot and move its entries into a new store.
#[inline] pub fn from_freeze(freeze: Freeze) -> Self
{
freeze.into_new()
}
}
// Primitive access
impl Store
{
/// Look up a single entry in `O(1)` time with its key.
#[inline] pub fn get(&self, key: &EntryKey) -> Option<&Entry>
{
self.data.get(key)
}
}

@ -1,184 +0,0 @@
//! Handling store mutation
use super::*;
use std::borrow::Borrow;
impl Store
{
/// Insert this entry into the data table, overwriting any identically hashed one and returning it.
pub fn insert_overwrite(&mut self, ent: Entry) -> Option<Entry>
{
let old = self.remove(ent.hash());
let hash_idx = self.data_hashes.insert(*ent.hash());
for tag in ent.tags.iter() {
self.insert_tag_for_idx(tag, hash_idx);
}
self.data.insert(ent);
old
}
/// Insert this entry then return a reference to it.
pub fn insert<'a, 'b>(&'a mut self, ent: Entry) -> &'b Entry
where 'a: 'b
{
let ffd = *ent.hash();
self.insert_overwrite(ent);
self.data.get(&ffd).unwrap()
}
/// Mutate this entry in place if it exists.
///
/// See [`map_entry`].
pub fn mutate_entry_in_place<T, F>(&mut self, ent_id: &EntryKey, f: F) -> Option<T>
where F: FnOnce(&mut Entry) -> T
{
if let Some(mut ent) = self.data.take(ent_id) {
let update = ent.prepare_for_refresh();
let out = f(&mut ent);
let new = ent;
self.refresh_for_entry(update, &new);
self.data.insert(new);
Some(out)
} else {
None
}
}
/// Update an entry that may have been modified.
///
/// The entries old hash and tags are passed, and it updates the store to reflect the new entry mutation.
/// You must still insert the new entry after this.
fn refresh_for_entry(&mut self, (ohash, otags): (impl Borrow<EntryKey>, impl AsRef<[String]>), new: &Entry)
{
let ohash = ohash.borrow();
let iidx = if new.hash() != ohash {
// We need to update `data_hashes`.
for (_, hash) in self.data_hashes.iter_mut()
{
if hash == ohash {
*hash = *new.hash();
break;
}
}
self.reverse_index_lookup(new.hash()).unwrap()
} else {
self.reverse_index_lookup(ohash).unwrap()
};
let otags =otags.as_ref();
if &new.tags[..] != &otags[..] {
// We need to update tag mappings
let ntags: HashSet<_> = new.tags.iter().collect();
let otags: HashSet<_> = otags.iter().collect();
// Find the ones that were removed and added in parallel.
for (t, u) in ntags.iter().zip(otags.iter())
{
if !otags.contains(t) {
// It was added
self.insert_tag_for_idx(t, iidx);
}
if !ntags.contains(u) {
// It was removed
self.remove_tag_for_idx(t, iidx);
}
}
}
}
/// Map the entry with this function, updating references to it if needed.
///
/// If the hash of the entry if modified by this map, then the hashes indecies are updated to the new hash.
pub fn map_entry<F>(&mut self, ent_id: &EntryKey, f: F)
where F: FnOnce(Entry) -> Entry
{
if let Some(ent) = self.data.take(ent_id) {
let update = ent.prepare_for_refresh();
let new = f(ent);
self.refresh_for_entry(update, &new);
self.data.insert(new);
}
}
/// Remove this entry, and return it, if it was set.
pub fn remove(&mut self, key: &EntryKey) -> Option<Entry>
{
if let Some(entry) = self.data.take(key) {
Some(self.cleanup_remove_entry(entry.with_no_cache()))
} else {
None
}
}
/// Preform cleanup on an entry *already removed* from `data`.
fn cleanup_remove_entry(&mut self, ent: Entry) -> Entry
{
// Remove any unused tags
if let Some(hash_idx) = self.reverse_index_lookup(ent.hash()) {
for tag in ent.tags.iter()
{
self.remove_tag_for_idx(tag, hash_idx);
}
}
// Remove from data hashes can be deferred
self.purge_if_needed();
ent
}
/// Remove dead mappings from `data_hashes` to `data`.
#[inline] fn purge_data_hash_mappings(&mut self)
{
let data = &self.data;
self.data_hashes.retain(move |_, hash| data.get(hash).is_some());
}
/// Purge the arena mapping if threshold of dead entries is reached, otherwise defer it.
#[inline] fn purge_if_needed(&mut self)
{
if self.purge_track.should_purge() {
self.purge_data_hash_mappings();
self.purge_track.num_removals = 0;
} else {
self.purge_track.num_removals += 1;
}
}
}
// Tag specific stuff
impl Store
{
/// Remove a mapping for this tag string to this specific hash index, cleaning up the tag mappings if needed.
#[inline] fn remove_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
match self.tag_mappings.get_mut(ti).map(|x| {x.remove(&hash_idx); x.len()}) {
Some(0) => no_op!(self.tag_mappings.remove(ti)), // there is only 1 mapping, remove it and then remove the tag (TODO: Should we keep the tag in the btree as cache? TODO: Add this to `PurgeTrack`)
None => (), // there is no mapping, just remove the tag
_ => return, //don't remove the tag, there's other references in the mapping
}
self.tags.remove(tag);
}
}
/// Insert a mapping for this tag string to this single hash index, creating it if needed
#[inline] fn insert_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
// This tag has an entry already, append to it
self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = self.tag_mappings.insert(iter![hash_idx].collect());
self.tags.insert(tag.to_owned(), ti);
}
}
}

@ -1,216 +0,0 @@
use super::*;
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::iter;
use std::collections::{
btree_map::Range,
VecDeque,
};
use smallvec::SmallVec;
use std::sync::Arc;
use tokio::{
sync::{
mpsc,
},
};
use futures::prelude::*;
const STACK_CACHE_SIZE: usize = 8;
/// An iterator over entries in a store matching *any* tags
#[derive(Debug)]
pub struct StoreSearchAnyIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a sha256::Sha256Hash>, PhantomData<T>);
/// An iterator over entries in a store matching *all* tags
#[derive(Debug)]
pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a Entry>, SmallVec<[&'a T; STACK_CACHE_SIZE]>);
// Searching by tags
impl Store
{
/// Lookup tag indecies for this iterator of tags
pub(super) fn tag_index_lookup<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> impl Iterator<Item = (& str, ArenaIndex)> + '_
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => Some(self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))),
_ => None
}.map_into_iter()
}
/// Find the `data_hashes` index for this entry in `data`.
pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option<ArenaIndex>
{
self.data_hashes.iter().filter_map(|(i, h)| if h == hs {
Some(i)
} else {
None
}).next()
}
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
///
/// The stream outputs `EntryKey`s, which can be used to look up the `Entry` in the store in `O(1)` time.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
///
/// # Panics
/// It is not guaranteed that the search will complete before the stream ends, if one of the background tasks panics; although this is unlikely.
/// If one of the tasks panics, it is ignored.
///
/// This is only useful in a multithreaded environment where tasks can be scheduled on different threads, otherwise use `tag_search_all`
pub fn tag_search_all_detached<U: Into<String>>(self: Arc<Self>, tags: impl IntoIterator<Item=U>) -> impl Stream<Item = EntryKey> + 'static
{
let (tx, rx) = mpsc::channel(16);
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
tokio::spawn(async move {
let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) {
(Some(low), Some(high)) => self.tags.range::<str, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return,
};
future::join_all(range.map(|(_, &ti)| {
let store = Arc::clone(&self);
let mut tx = tx.clone();
tokio::spawn(async move {
let data_hashes = &store.data_hashes;
for x in store.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
{
if tx.send(*x).await.is_err() {
return;
}
}
})
})).await;
});
rx
}
/// Search for all entries with *all* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAllIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAllIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAllIter(self, None, Default::default(), sorted),
}), VecDeque::new(), sorted)
}
/// Search for all entries with *any* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_any<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAnyIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAnyIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAnyIter(self, None, Default::default(), PhantomData),
}), VecDeque::new(), PhantomData)
}
/// Search for all items with this provided tag.
///
/// # Notes
/// This is allowed to produce duplicate entries, if an entry has two of the same tags set.
pub fn tag_search<'a, T: ?Sized + Ord>(&'a self, tag: &T) -> StoreSearchAnyIter<'a, T>
where String: Borrow<T>
{
let r= (std::ops::Bound::Included(tag), std::ops::Bound::Included(tag));
StoreSearchAnyIter(self, Some(self.tags.range::<T, _>(r)), VecDeque::new(), PhantomData)
}
fn _assert_test_search(&self)
{
let _x: Vec<_> = self.tag_search("hello").dedup_ref().collect();
let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect();
let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect();
}
async fn _assert_test_search_par(self: Arc<Self>)
{
let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await;
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T>
where T: Ord,
String: Borrow<T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let data = &self.0.data;
let tags = &self.3;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
.filter_map(move |x| data.get(x))
// Ensure all our `tags` are present in the entry's tags
.filter(move |entry| tags.iter()
.filter(move |&x| entry.tags
.binary_search_by_key(x, |t: &String| -> &T { t.borrow() })
.is_err())
.count() == 0);
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAnyIter<'a, T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx));
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front().map(|x| self.0.data.get(x)).flatten()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> iter::FusedIterator for StoreSearchAnyIter<'a, T>{}

@ -1,61 +0,0 @@
use super::*;
#[inline] fn gen_meta() -> StoreMetadata
{
StoreMetadata{
name: "test".to_string(),
root: "./test/db".to_owned().into(),
}
}
#[test]
fn create()
{
let store = Store::new(gen_meta());
let freeze = store.into_freeze();
let _ = freeze.into_new();
}
#[test]
fn entries()
{
let mut store = Store::new(gen_meta());
macro_rules! build {
() => {
EntryBuilder::new()
.name("Hello")
.desc("Desc")
.tags(&["Tag1", "Tag2", "Tag3"])
.hash(sha256::compute_slice(b"hello world"))
.location(Path::new("test_file"))
};
}
let entry = build!()
.build().expect("Failed to build entry");
println!("Entry: {:#?}", entry);
let entry = store.insert(entry).clone();
store.insert(build!()
.hash(sha256::compute_slice("something else"))
.name("Entry 2")
.build().expect("Failed to build entry 2"));
println!("Entry ref: {:#?}", entry);
println!("Store: {:#?}", store);
let freeze = store.freeze();
println!("Frozen: {:#?}", freeze);
let store2 = Store::unfreeze(&freeze);
assert_eq!(store2.tag_search("Tag2").filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert_eq!(store.tag_search_any(vec!["Tag2", "Tag3"]).filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert_eq!(store2.tag_search_all(vec!["Tag1", "Tag3"]).filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert!(store.tag_search_all(vec!["Tag1", "Nope"]).next().is_none());
let json = serde_json::to_string_pretty(&freeze).unwrap();
println!("Freeze serialised ({} bytes): '{}'", json.len(), json);
let ufreeze: Freeze = serde_json::from_str(json.as_str()).unwrap();
assert_eq!(ufreeze, freeze);
println!("Freeze unseralised as store: {:#?}", ufreeze.into_new());
}

@ -0,0 +1,96 @@
use super::*;
use std::cmp;
use std::ptr;
use std::ffi::c_void;
/// Explicitly zero out a byte buffer.
///
/// Essentially `explicit_bzero()`.
#[inline(never)]
pub fn explicit_clear(buffer : &mut[u8]) {
unsafe {
std::ptr::write_bytes(buffer.as_mut_ptr() as * mut c_void, 0, buffer.len());
if cfg!(target_arch = "x86_64") || cfg!(target_arch = "x86") {
asm!("clflush [{}]", in(reg)buffer.as_mut_ptr());
} else {
asm!("")
}
}
}
/// Set all bytes of this buffer to a single value.
///
/// Essentially `memset()`.
#[inline] pub fn set(buffer: &mut [u8], to: u8)
{
unsafe {
std::ptr::write_bytes(buffer.as_mut_ptr() as *mut c_void, to, buffer.len());
}
}
/// Zero out this buffer
///
/// Essentially `bzero()`.
#[inline(always)] pub fn clear(buffer: &mut [u8])
{
set(buffer, 0);
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices can overlap.
#[inline] pub fn memmove(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices must *not* overlap.
#[inline] pub fn memcpy(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy_nonoverlapping(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
/// Max size of bytes we'll allocate to the stack at runtime before using a heap allocated buffer.
pub const STACK_SIZE_LIMIT: usize = 4096;
/// Allocate `size` bytes. Allocates on the stack if size is lower than `STACK_SIZE_LIMIT`, otherwise allocates on the heap.
pub fn alloca_limit<F, T>(size: usize, f: F) -> T
where F: FnOnce(&mut [u8]) -> T
{
if size > STACK_SIZE_LIMIT {
thread_local! {
static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0u8; STACK_SIZE_LIMIT*2]);
}
BUFFER.with(move |buf| {
// If the borrow fails then `f` has recursively called into this function, so for that we allocate a new buffer instead of reusing this static one.
if let Ok(mut buf) = buf.try_borrow_mut() {
if buf.len() < size {
buf.resize(size, 0);
}
let res = f(&mut buf[..size]);
bytes::clear(&mut buf[..size]);
res
} else {
f(&mut vec![0u8; size])
}
})
} else {
stackalloc::alloca_zeroed(size, f)
// I don't think this is okay to do.
//stackalloc::alloca(size, move |buf| f(unsafe { stackalloc::helpers::slice_assume_init_mut(buf) }))
}
}

@ -0,0 +1,282 @@
//! Stream related things
use super::*;
use std::{
task::{
Poll,
Context,
},
pin::Pin,
marker::PhantomData,
};
use tokio::{
io::{
AsyncBufRead,
AsyncRead,
},
prelude::*,
};
use futures::{
stream::{
Stream,
StreamExt,
Fuse,
},
};
use pin_project::pin_project;
/// Converts a stream of byte-containing objects into an `AsyncRead` and `AsyncBufRead`er.
#[pin_project]
pub struct StreamReader<I, T>
where I: Stream<Item=T>
{
#[pin]
source: Fuse<I>,
buffer: Vec<u8>,
}
impl<T, I> StreamReader<I, T>
where I: Stream<Item=T>,
T: AsRef<[u8]>
{
/// The current buffer
pub fn buffer(&self) -> &[u8]
{
&self.buffer[..]
}
/// Consume into the original stream
pub fn into_inner(self) -> I
{
self.source.into_inner()
}
/// Create a new instance with a buffer capacity
pub fn with_capacity(source: I, cap: usize) -> Self
{
Self {
source: source.fuse(),
buffer: Vec::with_capacity(cap)
}
}
/// Create a new instance from this stream
pub fn new(source: I) -> Self
{
Self {
source: source.fuse(),
buffer: Vec::new(),
}
}
/// Attempt to add to this buffer
#[cold] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<usize>
{
let this = self.project();
match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
}
}
}
impl<T: AsRef<[u8]>, I: Stream<Item=T>> AsyncRead for StreamReader<I,T>
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.project();
if this.buffer.len() != 0 {
// We can fill the whole buffer, do it.
Poll::Ready(Ok(bytes::memcpy(this.buffer.drain(..buf.len()).as_slice(), buf)))
} else {
// Buffer is empty, try to fill it
match match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
} {
Poll::Ready(0) => Poll::Ready(Ok(0)),
Poll::Ready(x) => {
// x has been written
Poll::Ready(Ok(bytes::memcpy(this.buffer.drain(..x).as_slice(), buf)))
},
_ => Poll::Pending,
}
}
}
}
impl<T: AsRef<[u8]>, I: Stream<Item=T>> AsyncBufRead for StreamReader<I,T>
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.project();
if this.buffer.len() < 1 {
// Fetch more into buffer
match match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
} {
Poll::Ready(0) => Poll::Ready(Ok(&[])), // should we return EOF error here?
Poll::Ready(x) => Poll::Ready(Ok(&this.buffer[..x])),
_ => Poll::Pending
}
} else {
Poll::Ready(Ok(&this.buffer[..]))
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().buffer.drain(..amt);
}
}
#[cfg(test)]
mod tests
{
use super::*;
use tokio::{
sync::{
mpsc,
},
};
#[tokio::test]
async fn stream_of_vec()
{
let (mut tx, rx) = mpsc::channel(16);
let sender = tokio::spawn(async move {
tx.send("Hello ").await.unwrap();
tx.send("world").await.unwrap();
tx.send("\n").await.unwrap();
tx.send("How ").await.unwrap();
tx.send("are ").await.unwrap();
tx.send("you").await.unwrap();
});
let mut reader = StreamReader::new(rx);
let mut output = String::new();
let mut read;
while {read = reader.read_line(&mut output).await.expect("Failed to read"); read!=0} {
println!("Read: {}", read);
}
println!("Done: {:?}", output);
sender.await.expect("Child panic");
assert_eq!(&output[..], "Hello world\nHow are you");
}
}
/// A stream that chunks its input.
#[pin_project]
pub struct ChunkingStream<S, T, Into=Vec<T>>
{
#[pin] stream: Fuse<S>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> S
{
self.stream.into_inner()
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &S
{
self.stream.get_ref()
}
pub fn get_mut(&mut self)-> &mut S
{
self.stream.get_mut()
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Stream for ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
let this = self.as_mut().project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
// Stream is over
break;
},
Poll::Ready(Some(item)) => {
this.buf.push(item);
},
_ => return Poll::Pending,
}
}
//debug!("Sending buffer of {} (cap {})", self.buf.len(), self.cap);
// Buffer is full or we reach end of stream
Poll::Ready(if self.buf.len() == 0 {
None
} else {
let this = self.project();
*this.push_now = false;
let output = std::mem::replace(this.buf, Vec::with_capacity(*this.cap));
Some(output.into())
})
}
}

@ -0,0 +1,176 @@
//! Defer dropping an object to a background thread.
//!
//! This can help when dropping large structures in a performance-critial context.
use super::*;
use std::{
thread,
time::Duration,
sync::mpsc,
any::Any,
marker::{Send, Sync},
};
pub type Defer = Box<dyn Any + Send + 'static>;
/// A deferred dropping handle.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct DeferredDropper(mpsc::Sender<Defer>);
impl DeferredDropper
{
/// Drop a Boxed item on the background thread
///
/// # Panics
/// If the background thread has panicked.
#[inline] pub fn drop_boxed<T: Any + Send + 'static>(&self, item: Box<T>)
{
self.0.send(item).unwrap();
}
/// Drop an item on the background thread
///
/// # Panics
/// If the background thread has panicked.
#[inline(always)] pub fn drop<T: Any +Send+ 'static>(&self, item: T)
{
self.drop_boxed(Box::new(item))
}
/// Send this deferring drop
///
/// # Panics
/// If the background thread has panicked.
#[inline(always)] pub fn send(&self, item: impl Into<Defer>)
{
self.0.send(item.into()).unwrap()
}
}
/// Subscribe to the deferred dropper. Usually avoid doing this, and just use the `drop!` macro, or use the thread-local static dropper reference `HANDLE` directly.
fn defer_drop_sub() -> DeferredDropper
{
use std::sync::Mutex; // Can we get rid of this somehow? I don't think we can. Either we mutex lock here, or we mutex lock on every (I think every, should look into that..) send. I think this is preferrable.
#[repr(transparent)]
struct Shim(Mutex<mpsc::Sender<Defer>>);
lazy_static! {
static ref TX: Shim = {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for val in rx.into_iter().lag_sync(Duration::from_millis(10))
{
//let _ = thread::spawn(move || drop(val)).join(); // To catch panic?
drop(val); // What if this panics?
}
});
Shim(Mutex::new(tx))
};
}
DeferredDropper(TX.0.lock().unwrap().clone())
}
thread_local! {
pub static HANDLE: DeferredDropper = defer_drop_sub();
}
/// Drop this item on another thread
#[macro_export] macro_rules! defer_drop {
(in $sub:ident; box $val:expr) => {
$sub.drop_boxed($val)
};
(in $sub:ident; $val:expr) => {
$sub.drop($val)
};
(box $val:expr) => {
{
$crate::ext::defer_drop::HANDLE.with(move |sub| {
sub.drop_boxed($val)
});
}
};
($val:expr) => {
{
$crate::ext::defer_drop::HANDLE.with(move |sub| {
sub.drop($val)
});
}
};
}
#[cfg(test)]
mod tests
{
#[test]
fn mac()
{
use crate::*;
let sub = super::defer_drop_sub();
let large_vec = vec![String::from("hello world"); 1000];
defer_drop!(in sub; large_vec.clone());
defer_drop!(large_vec);
defer_drop!(box Box::new("hello world?"));
defer_drop!(in sub; box Box::new("hello world?"));
}
#[test]
fn dropping_larges()
{
for joiner in std::iter::repeat_with(|| {
let large_vec = vec![String::from("hello world"); 1000];
let h = {
let mut large_vec = large_vec.clone();
std::thread::spawn(move || {
large_vec.sort();
defer_drop!(large_vec);
})
};
defer_drop!(large_vec);
h
}).take(1000)
{
joiner.join().unwrap();
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
#[test]
fn dropping_vec()
{
let sub = super::defer_drop_sub();
let large_vec = vec![String::from("hello world"); 1000];
sub.drop(large_vec.clone());
sub.drop_boxed(Box::new(large_vec)) //we can't just send boxed slice because MUH SIZED???
//FUCK THIS! J)QI EJOAIJAOIW
/*
unsafe {
let raw = Box::into_raw(large_vec.into_boxed_slice());
sub.send(Box::from_raw(raw as *mut (dyn std::any::Any + Send + 'static))).unwrap();
}*/
}
#[test]
fn clone_shim()
{
for joiner in std::iter::repeat_with(|| {
std::thread::spawn(move || {
let mut subs = Vec::new();
for _ in 0..100 {
subs.push(super::defer_drop_sub());
}
})
}).take(1000)
{
joiner.join().unwrap();
}
}
}

@ -2,6 +2,18 @@ use super::*;
use std::hash::{BuildHasherDefault, Hasher};
use smallvec::SmallVec;
use cryptohelpers::sha256;
use ::bytes::Buf;
use cryptohelpers::sha2::{
Sha256,
Digest,
};
use std::borrow::BorrowMut;
use tokio::io::AsyncWrite;
use std::{
pin::Pin,
task::{Context, Poll},
io,
};
/// A hasher that takes the first 8 bytes from SHA256 hash as its output.
///
@ -27,10 +39,138 @@ impl Hasher for Sha256TopHasher
{
#[inline] fn finish(&self) -> u64 {
let mut bytes = [0u8; std::mem::size_of::<u64>()];
crate::slice::copy_bytes(self.0.as_ref(), &mut bytes[..]);
bytes::memcpy(self.0.as_ref(), &mut bytes[..]);
u64::from_le_bytes(bytes)
}
#[inline] fn write(&mut self, bytes: &[u8]) {
self.0.extend_from_slice(bytes);
}
}
/// An `AsyncWrite` implementor that writes it's inputs to a sha256 digest.
#[pin_project]
#[derive(Debug)]
pub struct Sha256Sink<H: BorrowMut<Sha256> = Sha256>
{
digest: H
}
impl<H: BorrowMut<Sha256>> Sha256Sink<H>
{
/// Create a new Sha256-computing `AsyncWrite` sink.
#[inline] pub fn new(digest: H) -> Self
{
Self{digest}
}
/// Consume into the inner digest
#[inline] pub fn into_inner(self) -> H
{
self.digest
}
/// The inner digest
#[inline] pub fn inner(&self) -> &H
{
&self.digest
}
/// The inner digest (mutable)
#[inline] pub fn inner_mut(&mut self) -> &mut H
{
&mut self.digest
}
#[inline(always)] pub fn digest(&self) -> &Sha256
{
self.digest.borrow()
}
#[inline(always)] pub fn digest_mut(&mut self) -> &mut Sha256
{
self.digest.borrow_mut()
}
}
impl<H: BorrowMut<Sha256>> AsRef<Sha256> for Sha256Sink<H>
{
fn as_ref(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<H: BorrowMut<Sha256>> AsMut<Sha256> for Sha256Sink<H>
{
fn as_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
/*
impl<'a, H: BorrowMut<Sha256>> AsRef<Sha256> for &'a Sha256Sink<H>
//where H: 'a
{
fn as_ref(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<'a, H: BorrowMut<Sha256>> AsMut<Sha256> for &'a mut Sha256Sink<H>
// where H: 'a
{
fn as_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
*/
impl<H: BorrowMut<Sha256>> BorrowMut<Sha256> for Sha256Sink<H>
{
#[inline] fn borrow_mut(&mut self) -> &mut Sha256 {
self.digest.borrow_mut()
}
}
impl<H: BorrowMut<Sha256>> Borrow<Sha256> for Sha256Sink<H>
{
#[inline] fn borrow(&self) -> &Sha256 {
self.digest.borrow()
}
}
impl<H: BorrowMut<Sha256>> AsyncWrite for Sha256Sink<H>
{
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
this.digest.borrow_mut().update(buf);
Poll::Ready(Ok(buf.len()))
}
#[inline(always)] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
#[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
pub trait Sha256HashExt
{
fn compute_sha256_hash(&self) -> sha256::Sha256Hash;
}
pub trait Sha256HashOwnedExt: Sized
{
fn into_sha256_hash(self) -> sha256::Sha256Hash;
}
impl<T: ?Sized> Sha256HashExt for T
where T: AsRef<[u8]>
{
#[inline] fn compute_sha256_hash(&self) -> sha256::Sha256Hash {
sha256::compute_slice(self.as_ref())
}
}
impl<T: Buf> Sha256HashOwnedExt for T
{
#[inline] fn into_sha256_hash(self) -> sha256::Sha256Hash {
sha256::compute_sync(self.reader()).unwrap()
}
}

@ -0,0 +1,116 @@
use std::{
mem,
iter::{
self,
ExactSizeIterator,
FusedIterator,
},
slice,
fmt,
};
#[derive(Debug, Clone)]
pub struct HexStringIter<I>(I, [u8; 2]);
impl<I: Iterator<Item = u8>> HexStringIter<I>
{
/// Write this hex string iterator to a formattable buffer
pub fn consume<F>(self, f: &mut F) -> fmt::Result
where F: std::fmt::Write
{
if self.1[0] != 0 {
write!(f, "{}", self.1[0] as char)?;
}
if self.1[1] != 0 {
write!(f, "{}", self.1[1] as char)?;
}
for x in self.0 {
write!(f, "{:02x}", x)?;
}
Ok(())
}
/// Consume into a string
pub fn into_string(self) -> String
{
let mut output = match self.size_hint() {
(0, None) => String::new(),
(_, Some(x)) |
(x, None) => String::with_capacity(x),
};
self.consume(&mut output).unwrap();
output
}
}
pub trait HexStringIterExt<I>: Sized
{
fn into_hex(self) -> HexStringIter<I>;
}
pub type HexStringSliceIter<'a> = HexStringIter<iter::Copied<slice::Iter<'a, u8>>>;
pub trait HexStringSliceIterExt
{
fn hex(&self) -> HexStringSliceIter<'_>;
}
impl<S> HexStringSliceIterExt for S
where S: AsRef<[u8]>
{
fn hex(&self) -> HexStringSliceIter<'_>
{
self.as_ref().iter().copied().into_hex()
}
}
impl<I: IntoIterator<Item=u8>> HexStringIterExt<I::IntoIter> for I
{
#[inline] fn into_hex(self) -> HexStringIter<I::IntoIter> {
HexStringIter(self.into_iter(), [0u8; 2])
}
}
impl<I: Iterator<Item = u8>> Iterator for HexStringIter<I>
{
type Item = char;
fn next(&mut self) -> Option<Self::Item>
{
match self.1 {
[_, 0] => {
use std::io::Write;
write!(&mut self.1[..], "{:02x}", self.0.next()?).unwrap();
Some(mem::replace(&mut self.1[0], 0) as char)
},
[0, _] => Some(mem::replace(&mut self.1[1], 0) as char),
_ => unreachable!(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (l, h) = self.0.size_hint();
(l * 2, h.map(|x| x*2))
}
}
impl<I: Iterator<Item = u8> + ExactSizeIterator> ExactSizeIterator for HexStringIter<I>{}
impl<I: Iterator<Item = u8> + FusedIterator> FusedIterator for HexStringIter<I>{}
impl<I: Iterator<Item = u8>> From<HexStringIter<I>> for String
{
fn from(from: HexStringIter<I>) -> Self
{
from.into_string()
}
}
impl<I: Iterator<Item = u8> + Clone> fmt::Display for HexStringIter<I>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
self.clone().consume(f)
}
}

@ -1,4 +1,6 @@
use super::*;
use std::iter::Fuse;
use std::marker::PhantomData;
/// An iterator that may be empty.
#[derive(Debug, Clone)]
@ -155,3 +157,108 @@ where S: AsRef<[T]>,
true
}
}
// Chunking iterator
/// A stream that chunks its input.
pub struct ChunkingIter<I, T, Into=Vec<T>>
{
stream: Fuse<I>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingIter<S,T, Into>
where S: Iterator<Item=T>,
Into: From<Vec<T>>
{
/// Create a new chunking iterator with this chunk size.
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> Fuse<S>
{
self.stream
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &Fuse<S>
{
&self.stream
}
pub fn get_mut(&mut self)-> &mut Fuse<S>
{
&mut self.stream
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Iterator for ChunkingIter<S,T, Into>
where S: Iterator<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn next(&mut self) -> Option<Self::Item> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
match self.stream.next() {
None => {
// Stream is over
break;
},
Some(item) => {
self.buf.push(item);
},
}
}
// Buffer is full or we reach end of iter
if self.buf.len() == 0 {
None
} else {
self.push_now = false;
let output = std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap));
Some(output.into())
}
}
}

@ -0,0 +1,107 @@
//! Lagging iterators / (todo) streams.
use super::*;
use std::time::Duration;
use tokio::time;
/// A lagged iterator
#[derive(Debug, Clone)]
pub struct Lag<I: ?Sized, T>(Duration, I)
where I: Iterator<Item=T>;
impl<I: ?Sized, T> Lag<I, T>
where I: Iterator<Item=T>
{
/// Set the lag duration
#[inline] pub fn set_duration(&mut self, dur: Duration)
{
self.0 = dur;
}
/// Get the lag duration
#[inline] pub fn duration(&self) -> Duration
{
self.0
}
}
impl<I, T> Lag<I, T>
where I: Iterator<Item=T>
{
/// Consume into the inner iterator
#[inline] pub fn into_inner(self) -> I
{
self.1
}
}
pub trait LagStreamExt: Stream + Sized
{
/// Throttle a `Stream` by this duration.
fn lag(self, dur: Duration) -> time::Throttle<Self>;
}
impl<T> LagStreamExt for T
where T: Stream
{
#[inline] fn lag(self, dur: Duration) -> time::Throttle<Self>
{
time::throttle(dur, self)
}
}
pub trait LagIterExt: Iterator
{
fn lag_sync(self, dur: Duration) -> Lag<Self, Self::Item>;
}
impl<I> LagIterExt for I
where I: Iterator
{
#[inline] fn lag_sync(self, dur: Duration) -> Lag<Self, Self::Item>
{
Lag(dur, self)
}
}
impl<I: ?Sized, T> Iterator for Lag<I, T>
where I: Iterator<Item=T>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
std::thread::sleep(self.0);
self.1.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.1.size_hint()
}
}
impl<I: ?Sized, T> FusedIterator for Lag<I, T>
where I: Iterator<Item=T> + FusedIterator{}
impl<I: ?Sized, T> ExactSizeIterator for Lag<I, T>
where I: Iterator<Item=T> + ExactSizeIterator{}
impl<I: ?Sized, T> DoubleEndedIterator for Lag<I, T>
where I: Iterator<Item=T> + DoubleEndedIterator
{
fn next_back(&mut self) -> Option<Self::Item>
{
std::thread::sleep(self.0);
self.1.next()
}
}
pub trait JitterExt<T>
{
/// Produce a random value between `self.0` and `self.1` inclusive
fn jitter(self) -> T;
}
impl<T> JitterExt<T> for (T, T)
where T: rand::distributions::uniform::SampleUniform
{
fn jitter(self) -> T
{
util::jitter(self.0, self.1)
}
}

@ -1,8 +1,19 @@
use super::*;
use std::iter::FusedIterator;
use std::collections::BTreeSet;
use std::borrow::Borrow;
use futures::prelude::*;
use std::cell::RefCell;
use std::ptr;
/// General utilities
pub mod util;
/// Functions for manipulating byte slices
pub mod bytes;
// Internal modules
mod iters;
pub use iters::*;
@ -12,8 +23,237 @@ pub use streams::*;
mod hashers;
pub use hashers::*;
mod hex;
pub use hex::*;
mod lag;
pub use lag::*;
mod defer_drop;
pub use defer_drop::*;
pub mod sync;
pub mod plex;
pub use plex::MultiplexStreamExt;
// The extension traits are defined in this `mod.rs` file, no need to re-export anything from here.
pub mod chunking;
/// How many elements should `precollect` allocate on the stack before spilling to the heap.
pub const PRECOLLECT_STACK_SIZE: usize = 64;
/// Implement `Default` for a type.
#[macro_export] macro_rules! default {
($ty:ty: $ex:expr) => {
impl Default for $ty
{
#[inline]
fn default() -> Self
{
$ex
}
}
}
}
/// Create a duration with time suffix `h`, `m`, `s`, `ms` or `ns`.
///
/// # Combination
/// These can also be combined.
/// ```
/// # use flan_utils::duration;
/// duration!(1 h, 20 m, 30 s);
/// ```
#[macro_export] macro_rules! duration
{
(0 $($_any:tt)?) => (::core::time::Duration::from_secs(0));
($dur:literal ms) => (::core::time::Duration::from_millis($dur));
($dur:literal ns) => (::core::time::Duration::from_nanos($dur));
($dur:literal s) => (::core::time::Duration::from_secs($dur));
($dur:literal m) => (::core::time::Duration::from_secs($dur * 60));
($dur:literal h) => (::core::time::Duration::from_secs($dur * 60 * 60));
( $($dur:literal $unit:tt),*)=> {
duration!(0 s) $(
+ duration!($dur $unit)
)*
};
}
/// Create a basic, C-like enum
#[macro_export] macro_rules! basic_enum {
($(#[$meta:meta])* $vis:vis $name:ident $(; $tcomment:literal)?: $($var:ident $(=> $comment:literal)?),+ $(,)?) => {
$(#[$meta])*
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy)]
$(#[doc = $tcomment])?
$vis enum $name {
$(
$(#[doc = $comment])?
$var
),+
}
}
}
/// Create a `Yes` or `No` enum.
#[macro_export] macro_rules! bool_type {
($vis:vis $name:ident $(; $comment:literal)? => $yes:ident, $no:ident) => {
basic_enum!(#[repr(u8)] $vis $name $(; $comment)?: $yes => "# First variant\n\nYes/true", $no => "# Second variant\n\nNo/false");
impl From<bool> for $name
{
#[inline] fn from(from: bool) -> Self
{
if from {
Self::$yes
} else {
Self::$no
}
}
}
impl From<$name> for bool
{
#[inline] fn from(from: $name) -> Self
{
match from {
$name::$yes => true,
$name::$no => false,
}
}
}
impl $name
{
/// Create from a bool value.
#[inline] pub const fn new(from: bool) -> Self
{
if from {
Self::$yes
} else {
Self::$no
}
}
/// Is this false?
#[inline] pub const fn is_no(self) -> bool
{
!self.is_yes()
}
/// Is this true?
#[inline] pub const fn is_yes(self) -> bool
{
match self {
Self::$yes => true,
Self::$no => false,
}
}
/// Return Some(T) if self is true.
#[inline] pub fn some<T>(self, value: T) -> Option<T>
{
self.and_then(move || value)
}
/// Map this value
#[inline] pub fn map<F, T>(self, f: F) -> T
where F: FnOnce(bool) -> T
{
f(self.is_yes())
}
/// Run this closure if value is false
#[inline] pub fn or_else<F, T>(self, f: F) -> Option<T>
where F: FnOnce() -> T
{
if let Self::$no = self {
Some(f())
} else {
None
}
}
/// Run this closure if value is true
#[inline] pub fn and_then<F, T>(self, f: F) -> Option<T>
where F: FnOnce() -> T
{
if let Self::$yes = self {
Some(f())
} else {
None
}
}
/// Return `yes` if true and `no` if false
#[inline] pub fn either<T>(self, yes: T, no: T) -> T
{
self.and_either(move || yes, move || no)
}
/// Run closure `yes` if value is true, `no` if value is false.
#[inline] pub fn and_either<F, G, T>(self, yes: F, no: G) -> T
where F: FnOnce() -> T,
G: FnOnce() -> T,
{
match self {
Self::$yes => yes(),
Self::$no => no(),
}
}
}
};
($vis:vis $name:ident $(; $comment:literal)?) => {
$crate::bool_type!($vis $name $(; $comment)? => Yes, No);
}
}
/// Create an accessor method. for a field in a structure.
///
/// The supported accessor types are: `ref`, `mut`, and `move`.
#[macro_export] macro_rules! accessor {
($vis:vis ref $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> &$ty {
&self.$internal
}
};
($vis:vis ref $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> &$ty {
&self.$internal
}
};
($vis:vis mut $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> &mut $ty {
&mut self.$internal
}
};
($vis:vis mut $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> &mut $ty {
&mut self.$internal
}
};
($vis:vis move $name:ident -> $ty:ty => $internal:ident $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> $ty {
self.$internal
}
};
($vis:vis move $name:ident -> $ty:ty => $internal:tt $(; $comment:literal)?) => {
$(#[doc=$comment])?
#[inline] $vis fn $name(&self) -> $ty {
self.$internal
}
};
}
/// Collect an iterator's output and then drop it to detach the iterator from any references or resources it might have.
#[macro_export] macro_rules! precollect {
($iter:expr, $num:literal) => {
@ -42,35 +282,288 @@ pub const PRECOLLECT_STACK_SIZE: usize = 64;
};
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum Either<T,U>
{
T(T),
U(U),
/// The ID type used for backing ID types;
pub type GenericID = uuid::Uuid;
/// Create a type that contains a (globally) unique ID.
#[macro_export] macro_rules! id_type {
($name:ident $(: $doc:literal)?) => ($crate::id_type!{pub(self) $name $(: $doc)?});
($vis:vis $name:ident $(: $doc:literal)?) => {
$(#[doc=$doc])?
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
$vis struct $name($crate::ext::GenericID);
impl $name
{
/// Create a new unique ID.
#[inline(always)] fn id_new() -> Self
{
Self($crate::ext::GenericID::new_v4())
}
/// The generic ID type backing this one
#[inline(always)] fn id_generic(&self) -> &$crate::ext::GenericID
{
&self.0
}
/// Consume into the generic ID
#[inline(always)] fn id_into_generic(self) -> $crate::ext::GenericID
{
self.0
}
/// Create from a generic ID
#[inline(always)] fn id_from_generic(gen: $crate::ext::GenericID) -> Self
{
Self(gen)
}
}
impl ::std::fmt::Display for $name
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result
{
//use ::std::fmt::Write;
f.write_str(concat!(stringify!($name),"<"))?;
self.0.fmt(f)?;
f.write_str(">")
}
}
}
}
impl<T,U> Either<T,U>
{
/// Convert this sum type into
pub fn cast<V>(self) -> V
where V: From<T> + From<U>// eh, this doesn't work. forget about it.
{
match self {
Self::T(t) => t.into(),
Self::U(u) => u.into(),
/// Expands to `unreachable_unchecked` in non-debug builds.
///
/// # Safety
/// You must make 100% sure this code path will never be entered, or it will cause undefined behaviour in release builds.
#[macro_export] macro_rules! debug_unreachable {
() => {
if cfg!(debug_assertions) {
#[cold] unreachable!()
} else {
::std::hint::unreachable_unchecked()
}
};
}
/// Dirty debugging macro to get the compiler to print an error message telling you the size of a type.
/// ```
/// check_size!((u8, u8)); // Expected ... found one with *2* elements
/// ```
/// # Size assertions
/// Can also be used to statically assert the size of a type
/// ```
/// # use datse::ext::check_size;
/// check_size!(u16 where == 2; "u16 should be 2 bytes");
/// check_size!(u16 where < 3; "u16 should be lower than 3 bytes");
/// check_size!(u16 where > 1; "u16 should be larger that 1 byte");
/// check_size!(u16 where != 0; "u16 should be larger that 0 bytes");
/// ```
///
/// You can also combine multiple
/// ```
/// # use datse::ext::check_size;
/// check_size!([u8; 512] where {
/// != 10;
/// > 511;
/// < 513;
/// == 512
/// });
/// ```
///
/// This can be used to give you prompts as to when you might want to consider boxing a type.
#[macro_export] macro_rules! check_size {
($t:ty) => {
const _: [(); 0] = [(); ::std::mem::size_of::<$t>()];
};
($t:ty where == $n:literal $(; $msg:literal)?) => {
const _: [(); $n] = [(); ::std::mem::size_of::<$t>()];
};
($t:ty where {$($op:tt $n:literal);+} $(; $msg:literal)?) => {
$(
$crate::static_assert!(::std::mem::size_of::<$t>() $op $n);
)+
};
($t:ty where $op:tt $n:literal $(; $msg:literal)?) => {
$crate::static_assert!(::std::mem::size_of::<$t>() $op $n $(; $msg)?);
};
}
check_size!(u8 where == 1);
check_size!(u16 where > 1);
check_size!([u8; 512] where <= 512);
check_size!([u8; 512] where {
!= 10;
> 511;
< 513;
== 512
});
/// Assert the output of a constant boolean expression is `true` at compile time.
#[macro_export] macro_rules! static_assert {
($val:expr $(; $msg:literal)?) => {
const _: [(); 1] = [(); ($val as bool) as usize];
}
}
/// Assert a trait is object safe. This will produce a compiler error if the trait is not object safe
#[macro_export] macro_rules! assert_object_safe {
($trait:path $(; $msg:literal)?) => {
const _:() = {
#[cold] fn __assert_object_safe() -> !
{
let _: &dyn $trait;
unsafe {
debug_unreachable!()
}
}
};
}
}
/// Create an ad-hoc sum type.
macro_rules! either {
(@ type $first:ty) => {
$first
assert_object_safe!(AsRef<str>; "object safety assertion test");
static_assert!(1+1==2; "static assertion test");
/// Execute this expression if running in debug mode.
#[macro_export] macro_rules! debug_expr {
(if $expr:expr; else $or:expr) => {
if cfg!(debug_assertions) { $expr }
else { $or }
};
(@ type $first:ty, $($rest:ty),*) => {
Either<$first, either!(@ type $($rest),*)>
(else $expr:expr) => {
if !cfg!(debug_assertions) { $expr }
};
(type $($type:ty)|*) => {
either!(@ type $($type),*)
($expr:expr) => {
if cfg!(debug_assertions) { $expr }
};
}
/// Run each expression in sequence then return the result of the first one.
///
/// # Example
/// ```
/// # use mtfse::prog1;
/// assert_eq!(prog1 {
/// 1 + 2;
/// println!("Done thing");
/// 4 + 5;
/// }, 3);
/// ```
#[macro_export] macro_rules! prog1 {
($first:expr; $($rest:expr);* $(;)?) => {
($first, $( $rest ),*).0
}
}
/// Helper to create an `std::io::Error` structure.
///
/// # Example
/// ```
/// # use mtfse::io_err;
/// let err = io_err!(Other, "some error");
/// ```
#[macro_export] macro_rules! io_err {
($kind:ident, $msg:expr) => {
::std::io::Error::new(::std::io::ErrorKind::$kind, $msg)
};
($msg:expr) => (io_err!(Other, $msg));
}
pub trait UnwrapInfallible<T>
{
fn unwrap_infallible(self) -> T;
}
impl<T> UnwrapInfallible<T> for Result<T, std::convert::Infallible>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Err`.
#[inline(always)] fn unwrap_infallible(self) -> T {
match self {
Ok(v) => v,
Err(_) => unsafe { debug_unreachable!() },
}
}
}
impl<T> UnwrapInfallible<T> for Result<T, !>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Err`.
#[inline(always)] fn unwrap_infallible(self) -> T {
match self {
Ok(v) => v,
Err(_) => unsafe { debug_unreachable!() },
}
}
}
pub trait UnwrapErrInfallible<T>
{
fn unwrap_err_infallible(self) -> T;
}
impl<T> UnwrapErrInfallible<T> for Result<std::convert::Infallible, T>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Ok`.
#[inline(always)] fn unwrap_err_infallible(self) -> T {
match self {
Err(v) => v,
Ok(_) => unsafe { debug_unreachable!() },
}
}
}
impl<T> UnwrapErrInfallible<T> for Result<!, T>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Ok`.
#[inline(always)] fn unwrap_err_infallible(self) -> T {
match self {
Err(v) => v,
Ok(_) => unsafe { debug_unreachable!() },
}
}
}
pub trait ChunkStreamExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>;
fn chunk(self, sz: usize) -> chunking::ChunkingStream<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkStreamExt<T> for S
where S: Stream<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>
{
chunking::ChunkingStream::new(self, sz)
}
}
pub trait ChunkIterExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> iters::ChunkingIter<Self,T,I>;
fn chunk(self, sz: usize) -> iters::ChunkingIter<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkIterExt<T> for S
where S: Iterator<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> iters::ChunkingIter<Self,T,I>
{
iters::ChunkingIter::new(self, sz)
}
}

@ -0,0 +1,465 @@
//! Multiplexing
use super::*;
use std::io;
use futures::prelude::*;
use std::marker::PhantomData;
use tokio::io::{
AsyncWrite,
};
use std::{
task::{Context, Poll},
pin::Pin,
};
use std::{fmt, error};
/// For a `WriteRule::compare_byte_sizes()` implementation that never errors.
#[derive(Debug)]
pub enum CompareInfallible{}
impl error::Error for CompareInfallible{}
impl fmt::Display for CompareInfallible
{
fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result
{
match *self {}
}
}
impl From<CompareInfallible> for io::Error
{
fn from(from: CompareInfallible) -> Self
{
match from {}
}
}
/// Static rules for a `MultiplexingStream` to use.
pub trait WriteRule
{
type CompareFailedError: Into<io::Error> = CompareInfallible;
/// When a successful poll to both streams and the number of bytes written differs, chooses which to return.
///
/// # Errors
/// You can also choose to return an error if the sizes do not match.
/// The error must be convertable to `io::Error`.
/// By default, the error is `CompareInfallible`, which is a never-type alias that implements `Into<io::Error>` for convenience and better optimisations over using a non-vacant error type that is never returned.
///
/// If you are using an error type that may be returned, set the `CompareFailedError` to the error type you choose, as long as it implements `Into<sio::Error>` (or simply set it to `io::Error`, the generic conversion exists just to allow for using a vacant type here when an error will never be returned by this function.)
///
/// # Default
/// The default is to return the **lowest** number of bytes written.
#[inline(always)] fn compare_byte_sizes(a: usize, b: usize) -> Result<usize, Self::CompareFailedError>
{
Ok(std::cmp::min(a, b))
}
}
/// The default `WriteRule` static rules for a `MultiplexingStream` to follow.
#[derive(Debug)]
pub struct DefaultWriteRule(());
impl WriteRule for DefaultWriteRule{}
// When one completes but not the other, we set this enum to the completed one. Then, we keep polling the other until it also completes. After that, this is reset to `None`.
#[derive(Debug)]
enum StatRes<T>
{
First(io::Result<T>),
Second(io::Result<T>),
None,
}
impl<T> From<StatRes<T>> for Option<io::Result<T>>
{
#[inline] fn from(from: StatRes<T>) -> Self
{
match from {
StatRes::First(r) |
StatRes::Second(r) => Some(r),
_ => None
}
}
}
impl<T> StatRes<T>
{
/// Does this stat have a result?
pub fn has_result(&self) -> bool
{
if let Self::None = self {
false
} else {
true
}
}
}
impl<T> Default for StatRes<T>
{
#[inline]
fn default() -> Self
{
Self::None
}
}
type StatWrite = StatRes<usize>;
type StatFlush = StatRes<()>;
type StatShutdown = StatRes<()>;
#[derive(Debug)]
struct Stat
{
write: StatWrite,
flush: StatFlush,
shutdown: StatShutdown,
}
default!(Stat: Self {
write: Default::default(),
flush: Default::default(),
shutdown: Default::default(),
});
/// An `AsyncWrite` stream that dispatches its writes to 2 outputs
///
/// # Notes
/// If the backing stream's `write` impls provide different results for the number of bytes written, which to return is determined by the `Rule` parameter's `compare_byte_sizes()` function.
#[pin_project]
#[derive(Debug)]
pub struct MultiplexWrite<T,U, Rule: WriteRule + ?Sized = DefaultWriteRule>
{
#[pin] s1: T,
#[pin] s2: U,
// `Stat` is big, box it.
stat: Box<Stat>,
_rule: PhantomData<Rule>,
}
impl<T,U> MultiplexWrite<T, U>
where T: AsyncWrite,
U: AsyncWrite
{
/// Create a new `AsyncWrite` multiplexer
///
/// The default static write rule will be used
#[inline(always)] pub fn new(s1: T, s2: U) -> Self
{
Self::new_ruled(s1, s2)
}
}
impl<T,U, Rule: WriteRule + ?Sized> MultiplexWrite<T, U, Rule>
where T: AsyncWrite,
U: AsyncWrite
{
/// Create a new `AsyncWrite` multiplexer with a specified static write rule.
#[inline] pub fn new_ruled(s1: T, s2: U) -> Self
{
Self {
s1, s2, stat: Box::new(Default::default()),
_rule: PhantomData
}
}
/// Consume into the 2 backing streams
#[inline] pub fn into_inner(self) -> (T, U)
{
(self.s1, self.s2)
}
/// Chain to another `AsyncWrite` stream
#[inline] pub fn chain<V: AsyncWrite>(self, s3: V) -> MultiplexWrite<Self, V>
{
MultiplexWrite::new(self, s3)
}
/// References to the inner streams
#[inline] pub fn streams(&self) -> (&T, &U)
{
(&self.s1, &self.s2)
}
/// Mutable references to the inner streams
#[inline] pub fn streams_mut(&mut self) -> (&mut T, &mut U)
{
(&mut self.s1, &mut self.s2)
}
/// Extension method for `write_all` that ensures both streams have the same number of bytes written.
#[inline] pub async fn write_all(&mut self, buf: &[u8]) -> io::Result<()>
where T: Unpin, U: Unpin
{
use tokio::prelude::*;
let (s1, s2) = self.streams_mut();
let (r1, r2) = tokio::join![
s1.write_all(buf),
s2.write_all(buf),
];
r1?;
r2?;
Ok(())
}
}
impl<T: AsyncWrite> UniplexWrite<T>
{
/// Create a `MultiplexWrite` with only one output.
///
/// The default static write rule will be used.
#[inline] pub fn single(s1: T) -> Self
{
Self::new(s1, tokio::io::sink())
}
}
impl<T: AsyncWrite, Rule: WriteRule + ?Sized> UniplexWrite<T, Rule>
{
/// Create a `MultiplexWrite` with only one output with a specified static write rule.
#[inline] pub fn single_ruled(s1: T) -> Self
{
Self::new_ruled(s1, tokio::io::sink())
}
/// Add a second output to this writer
#[inline] pub fn into_multi<U: AsyncWrite>(self, s2: U) -> MultiplexWrite<T, U, Rule>
{
MultiplexWrite::new_ruled(self.s1, s2)
}
}
/// A `MultiplexWrite` with only 1 output.
pub type UniplexWrite<T, Rule = DefaultWriteRule> = MultiplexWrite<T, tokio::io::Sink, Rule>;
impl<T, U, Rule: WriteRule + ?Sized> AsyncWrite for MultiplexWrite<T, U, Rule>
where T: AsyncWrite, U: AsyncWrite
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
let (r1, r2) = match std::mem::replace(&mut this.stat.write, StatRes::None) {
StatRes::First(r1) => {
let r2 = this.s2.poll_write(cx, buf);
(Poll::Ready(r1), r2)
},
StatRes::Second(r2) => {
let r1 = this.s1.poll_write(cx, buf);
(r1, Poll::Ready(r2))
}
StatRes::None => {
let r1 = this.s1.poll_write(cx, buf);
let r2 = this.s2.poll_write(cx, buf);
(r1, r2)
}
};
match (r1, r2) {
(Poll::Ready(r1), Poll::Ready(r2)) => {
// Both ready. Return result that has the most bytes written.
// Note: No need to update `stat` for this branch, it already has been set to `None` in the above match expr.
return Poll::Ready(Rule::compare_byte_sizes(r1?, r2?).map_err(Into::into));
},
(Poll::Ready(r1), _) => {
// First ready. Update stat to first
this.stat.write = StatRes::First(r1);
},
(_, Poll::Ready(r2)) => {
// Second ready. Update stat to second
this.stat.write = StatRes::Second(r2);
}
// Both are pending, re-poll both next time (as `stat.write` is set to `None`).
_ => ()
}
Poll::Pending
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = self.project();
let (r1, r2) = match std::mem::replace(&mut this.stat.flush, StatRes::None) {
StatRes::First(r1) => {
let r2 = this.s2.poll_flush(cx);
(Poll::Ready(r1), r2)
},
StatRes::Second(r2) => {
let r1 = this.s1.poll_flush(cx);
(r1, Poll::Ready(r2))
}
StatRes::None => {
let r1 = this.s1.poll_flush(cx);
let r2 = this.s2.poll_flush(cx);
(r1, r2)
}
};
match (r1, r2) {
(Poll::Ready(r1), Poll::Ready(r2)) => {
// Both ready.
// Note: No need to update `stat` for this branch, it already has been set to `None` in the above match expr.
r1?;
r2?;
return Poll::Ready(Ok(()));
},
(Poll::Ready(r1), _) => {
// First ready. Update stat to first
this.stat.flush = StatRes::First(r1);
},
(_, Poll::Ready(r2)) => {
// Second ready. Update stat to second
this.stat.flush = StatRes::Second(r2);
}
// Both are pending, re-poll both next time (as `stat.flush` is set to `None`).
_ => ()
}
Poll::Pending
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = self.project();
let (r1, r2) = match std::mem::replace(&mut this.stat.shutdown, StatRes::None) {
StatRes::First(r1) => {
let r2 = this.s2.poll_shutdown(cx);
(Poll::Ready(r1), r2)
},
StatRes::Second(r2) => {
let r1 = this.s1.poll_shutdown(cx);
(r1, Poll::Ready(r2))
}
StatRes::None => {
let r1 = this.s1.poll_shutdown(cx);
let r2 = this.s2.poll_shutdown(cx);
(r1, r2)
}
};
match (r1, r2) {
(Poll::Ready(r1), Poll::Ready(r2)) => {
// Both ready.
// Note: No need to update `stat` for this branch, it already has been set to `None` in the above match expr.
r1?;
r2?;
return Poll::Ready(Ok(()));
},
(Poll::Ready(r1), _) => {
// First ready. Update stat to first
this.stat.shutdown = StatRes::First(r1);
},
(_, Poll::Ready(r2)) => {
// Second ready. Update stat to second
this.stat.shutdown = StatRes::Second(r2);
}
// Both are pending, re-poll both next time (as `stat.shutdown` is set to `None`).
_ => ()
}
Poll::Pending
}
}
impl<T, U> From<(T, U)> for MultiplexWrite<T, U>
where T: AsyncWrite,
U: AsyncWrite,
{
#[inline] fn from((s1, s2): (T, U)) -> Self
{
Self::new(s1, s2)
}
}
impl<T,U> From<MultiplexWrite<T, U>> for (T, U)
where T: AsyncWrite,
U: AsyncWrite,
{
fn from(from: MultiplexWrite<T, U>) -> Self
{
from.into_inner()
}
}
pub trait MultiplexStreamExt: Sized + AsyncWrite
{
/// Create a multi-outputting `AsyncWrite` stream writing to both this an another with a static write rule.
fn multiplex_ruled<T: AsyncWrite, Rule: ?Sized + WriteRule>(self, other: T) -> MultiplexWrite<Self, T, Rule>;
/// Create a multi-outputting `AsyncWrite` stream writing to both this an another.
#[inline(always)] fn multiplex<T: AsyncWrite>(self, other: T) -> MultiplexWrite<Self, T>
{
self.multiplex_ruled::<T, DefaultWriteRule>(other)
}
}
impl<S: AsyncWrite> MultiplexStreamExt for S
{
#[inline] fn multiplex_ruled<T: AsyncWrite, Rule: ?Sized + WriteRule>(self, other: T) -> MultiplexWrite<Self, T, Rule> {
MultiplexWrite::new_ruled(self, other)
}
}
#[cfg(test)]
mod tests
{
use tokio::prelude::*;
use super::{
MultiplexWrite,
UniplexWrite,
};
#[tokio::test]
async fn mp_write_all()
{
const INPUT: &'static str = "Hello world.";
let mut o1 = Vec::new();
let mut o2 = Vec::new();
{
let mut mp = MultiplexWrite::new(&mut o1, &mut o2);
mp.write_all(INPUT.as_bytes()).await.expect("mp write failed");
mp.flush().await.expect("mp flush");
mp.shutdown().await.expect("mp shutdown");
}
assert_eq!(o1.len(), o2.len());
assert_eq!(&o1[..], INPUT.as_bytes());
assert_eq!(&o2[..], INPUT.as_bytes());
}
#[tokio::test]
async fn multiplex()
{
const INPUT: &'static str = "Hello world.";
let mut o1 = Vec::new();
let mut o2 = Vec::new();
{
let mut mp = MultiplexWrite::new(&mut o1, &mut o2);
assert_eq!(mp.write(INPUT.as_bytes()).await.expect("mp write failed"), INPUT.len());
mp.flush().await.expect("mp flush");
mp.shutdown().await.expect("mp shutdown");
}
assert_eq!(o1.len(), o2.len());
assert_eq!(&o1[..], INPUT.as_bytes());
assert_eq!(&o2[..], INPUT.as_bytes());
}
#[tokio::test]
async fn uniplex()
{
const INPUT: &'static str = "Hello world.";
let mut o1 = Vec::new();
{
let mut mp = UniplexWrite::single(&mut o1);
assert_eq!(mp.write(INPUT.as_bytes()).await.expect("mp write failed"), INPUT.len());
mp.flush().await.expect("mp flush");
mp.shutdown().await.expect("mp shutdown");
}
assert_eq!(&o1[..], INPUT.as_bytes());
}
}

@ -0,0 +1,257 @@
//! Sync utils
use super::*;
use std::{
ptr::NonNull,
sync::atomic::{AtomicBool, Ordering},
};
use std::ops::Drop;
use crossbeam_utils::atomic::AtomicCell;
struct InitData<T>
{
data: AtomicCell<Option<T>>,
drop: AtomicBool,
}
#[derive(Debug)]
struct InitInner<T>
{
boxed: NonNull<InitData<T>>,
}
impl<T> InitInner<T>
{
#[inline] pub fn new() -> Self
{
let from = Box::new(InitData {
data: AtomicCell::new(None),
drop: false.into(),
});
Self {
boxed: unsafe{NonNull::new_unchecked(Box::into_raw(from))},
}
}
}
impl<T> Clone for InitInner<T>
{
fn clone(&self) -> Self {
Self {
boxed: unsafe{NonNull::new_unchecked(self.boxed.as_ptr())}
}
}
}
impl<T> InitInner<T>
{
#[inline(always)] fn should_drop(&self) -> &AtomicBool
{
&unsafe {&*self.boxed.as_ptr()}.drop
}
#[inline(always)] fn data_ref(&self) -> &InitData<T>
{
unsafe{&*self.boxed.as_ptr()}
}
}
unsafe impl<T> Send for InitInner<T>{}
unsafe impl<T> Sync for InitInner<T>{}
impl<T> Drop for InitInner<T>
{
fn drop(&mut self) {
if let Err(true) = self.should_drop().compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) {
unsafe {
drop(Box::from_raw(self.boxed.as_ptr()))
}
}
}
}
#[derive(Debug)]
pub struct InitHalf<T>(InitInner<T>);
#[derive(Debug)]
pub struct UninitHalf<T>(InitInner<T>);
impl<T> InitHalf<T>
{
/// Set the value
pub fn set(&mut self, value: T)
{
self.0.data_ref().data.store(Some(value));
}
/// Set the value, returning the previous one if it was set
pub fn swap(&mut self, value: T) -> Option<T>
{
self.0.data_ref().data.swap(Some(value))
}
}
impl<T> UninitHalf<T>
{
/// Take the initialised value
///
/// # Panics
/// If the value hasn't yet been initialised.
pub fn take(&mut self) -> T
{
self.0.data_ref().data.take().unwrap()
}
/// Take the value if it is set, if not, returns `None`.
pub fn try_take(&mut self) -> Option<T>
{
self.0.data_ref().data.take()
}
}
/// Create a pair of atomic initialser-receivers.
pub fn shared_uninit<T>() -> (InitHalf<T>, UninitHalf<T>)
{
let inner = InitInner::<T>::new();
(InitHalf(inner.clone()),
UninitHalf(inner))
}
/*
/// Type to allow for a seperate thread or task to initialise a value.
#[derive(Debug)]
pub struct SharedUninit<T>(oneshot::Receiver<T>);
/// Type to initialise a value for a `SharedUninit`.
#[derive(Debug)]
pub struct SharedInitialiser<T>(oneshot::Sender<T>);
impl<'a, T> SharedUninit<T>
where T: 'a
{
/// Create an uninit/initialiser pair.
pub fn pair() -> (SharedInitialiser<T>, Self)
{
let (tx, rx) = oneshot::channel();
(SharedInitialiser(tx), Self(rx))
}
/// Try to receive the initialised value.
///
/// Returns `None` if the initialiser was dropped before setting a value.
#[inline] pub fn try_get(self) -> impl Future<Output = Option<T>> + 'a
{
self.0.map(|x| x.ok())
}
/// Receive the initialised value.
///
/// # Panics
/// If the initialiser was dropped without setting a value.
#[inline] pub fn get(self) -> impl Future<Output = T> + 'a
{
self.try_get().map(|x| x.unwrap())
}
}
impl<'a, T> SharedInitialiser<T>
where T: 'a
{
/// Set the value for the `SharedUninit`.
#[inline] pub fn init(self, value: T)
{
let _ = self.0.send(value);
}
}
*/
// This was a failure. Just use `tokio::sync::oneshot`...
/*
#[derive(Debug)]
struct SharedInitialiser<T>
{
data: Arc<(UnsafeCell<MaybeUninit<T>>, AtomicBool)>,
}
impl<T> Clone for SharedInitialiser<T>
{
#[inline] fn clone(&self) -> Self {
Self { data: Arc::clone(&self.data) }
}
}
#[derive(Debug)]
pub struct SharedInitRx<T>(SharedInitialiser<T>);
/// Permits initialising across a thread.
// Do we even need this? No.. We can just use `tokio::sync::oneshot`....
#[derive(Debug)]
pub struct SharedInitTx<T>(SharedInitialiser<T>);
impl<T> SharedInitTx<T>
{
/// Consume this instance and initialise it.
///
/// # Panics
/// If there is already a value set (this should never happen).
pub fn initialise(self, value: T)
{
todo!()
}
}
impl<T> SharedInitRx<T>
{
/// Create a sender and receiver pair
pub fn pair() -> (SharedInitTx<T>, Self)
{
let this = Self::new();
(this.create_tx(), this)
}
/// Create a new, uninitialised receiver.
#[inline] fn new() -> Self
{
Self(SharedInitialiser{data: Arc::new((UnsafeCell::new(MaybeUninit::uninit()), false.into()))})
}
/// Create an initialiser
///
/// # Panics (debug)
/// If an initialiser already exists
#[inline] fn create_tx(&self) -> SharedInitTx<T>
{
debug_assert_eq!(Arc::strong_count(&self.0.data), 1, "Sender already exists");
SharedInitTx(self.0.clone())
}
/// Checks if there is a value present, or if it is possible for a value to be present.
pub fn is_pending(&self) -> bool
{
todo!("Either self.0.data.1 == true, OR, strong_count() == 2")
}
/// Has a value already been set
pub fn has_value(&self) -> bool
{
todo!("self.0.data.1 == true")
}
/// Try to consume into the initialised value.
pub fn try_into_value(self) -> Result<T, Self>
{
todo!()
}
/// Consume into the initialised value
///
/// # Panics
/// If the value hasn't been initialised
#[inline] pub fn into_value(self) -> T
{
self.try_into_value().map_err(|_| "No initialised value present").unwrap()
}
/// Does this receiver have an initialser that hasn't yet produced a value?
pub fn has_initialiser(&self) -> bool
{
Arc::strong_count(&self.0.data) == 2
}
}
*/

@ -0,0 +1,14 @@
//! Misc. Utilites
use super::*;
/// Get a random value between these two inclusive
pub fn jitter<T>(min: T, max: T) -> T
where T: rand::distributions::uniform::SampleUniform
{
use rand::Rng;
let mut thread = rand::thread_rng();
let dist = rand::distributions::Uniform::new_inclusive(min, max);
thread.sample(dist)
}

@ -1,9 +1,14 @@
#![feature(never_type)]
#![feature(associated_type_defaults)]
#![feature(asm)]
#![allow(dead_code)]
#![cfg_attr(debug_assertions, allow(unused_imports))]
#[macro_use] extern crate ad_hoc_iter;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate lazy_static;
#[macro_use] extern crate pin_project;
use serde::{Serialize, Deserialize};
@ -12,19 +17,20 @@ use jemallocator::Jemalloc;
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
macro_rules! io_err {
($kind:ident, $msg:expr) => {
::std::io::Error::new(::std::io::ErrorKind::$kind, $msg)
};
($msg:expr) => (io_err!(Other, $msg));
}
// Utils
mod ext;
use ext::*;
mod slice;
pub mod data;
// Real stuff
mod service;
// Serving
// mod ep;
fn main() {
// debug_expr!(println!("debug"));
// debug_expr!(else println!("not debug"));
// debug_expr!(if println!("debug 2"); else println!("not debug 2"));
println!("Hello, world!");
}

@ -0,0 +1,11 @@
use super::*;
use config::*;
/// Builder for a service
#[derive(Debug, Clone)]
pub struct ServiceBuilder
{
/// Settings for the service to use
// Boxed because this is a large structure
settings: Box<ServiceSettings>,
}

@ -0,0 +1,25 @@
//! Caching errors
use super::*;
use std::io;
use std::{
fmt,
error,
};
/// A partial cache entry initialisation error
#[derive(Debug)]
pub struct PartialInitError(pub(super) io::Error);
impl error::Error for PartialInitError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(&self.0)
}
}
impl fmt::Display for PartialInitError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Failed to initialise a partial cache entry")
}
}

@ -0,0 +1,59 @@
//! Memory holding types
use super::*;
use ::bytes::{
Bytes,
BytesMut,
BufMut,
Buf,
};
use std::{
pin::Pin,
task::{Context, Poll},
io,
};
use tokio::io::AsyncWrite;
// TODO: For when `MemoryMut` is vectorised, this will be the max size for each allocation
pub const PAGE_SIZE: usize = 4096;
/// The memory hold of a cahce entry.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Memory(pub(super) Bytes);
/// Unfrozen memory hold of a cache entry.
//TODO: Allow for non-resizing writes by making this `LinkedList<BytesMut>` or something, then we can implement `BufMut` for this
#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub struct MemoryMut(pub(super) BytesMut); //TODO: Type will be `SmallVec<[BytesMut; 1]>` or `LinkedList<BytesMut>` or maybe a `smolset` type (probably linkedlist is best actually...).
impl MemoryMut
{
/// Freeze this mutable memory into an immutable one
pub fn freeze(self) -> Memory
{
Memory(self.0.freeze())
}
/// Create a new, empty mutable memory pool
pub fn new() -> Self
{
Self(BytesMut::new())
}
}
impl AsyncWrite for MemoryMut
{
#[inline(always)] fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
//TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut`
Poll::Ready(Ok(()))
}
#[inline(always)] fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
//TODO: When vectorised (see above TODO), this method will make it a single contiguous `BytesMut`
Poll::Ready(Ok(()))
}
fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
//TODO: When vectorised, this will either: fill the previous allocation with enough out of `buf`; then create a new allocation and write the rest there; repeat.
// This is kinda paging
self.get_mut().0.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
}

@ -0,0 +1,162 @@
//! Service cacheing
use super::*;
use std::{
hash::Hash,
marker::{Send, Sync},
fmt,
};
use std::sync::{
Weak,
RwLock,
atomic::AtomicUsize,
};
use std::cell::UnsafeCell;
use std::path::PathBuf;
use std::num::NonZeroUsize;
use std::collections::HashMap;
use chrono::DateTime;
use uuid::Uuid;
use crossbeam_utils::atomic::AtomicCell;
use cryptohelpers::sha256;
pub type Timezone = chrono::Utc;
/// A type that can be used for the cache key
pub trait Key: Sized +
Send +
Sync +
Hash +
PartialEq +
Eq +
fmt::Display +
Serialize +
for<'a> Deserialize<'a> +
'static
{}
impl<T> Key for T
where T:
Send +
Sync +
Hash +
PartialEq +
Eq +
fmt::Display +
Serialize +
for<'a> Deserialize<'a> +
'static
{}
mod mem;
use mem::Memory;
basic_enum!(pub PurgeOrder; "How should a cache determine what to purge": Oldest => "Purge the oldest entries first", LeastUsed => "Purge the least accessed entries first", OldestUsed => "Purge the oldest accessed entries first");
default!(PurgeOrder: Self::LeastUsed);
/// Config for a cache
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Config
{
// General
// Disk cache
/// The directory to save to data to.
/// If `None`, se memory cache only.
pub disk_location: Option<PathBuf>,
// Memory cache
/// Max size of an entry to cache in memory
pub mem_max_ent_size: Option<NonZeroUsize>,
/// Max entries in memory cache
pub mem_max_ent: Option<NonZeroUsize>,
/// Max total bytes to keep in memcache before purging older entries.
pub mem_max_total_size: Option<NonZeroUsize>,
/// When purging entries from memcache, how to select ones to purge
pub mem_purge_order: PurgeOrder,
}
/// An entry in a `ByteCache`.
#[derive(Debug)]
pub struct CacheEntry //<K: Key> // `K` is the key in `entries`.
{
id: Uuid,
tm_accessed: RwLock<DateTime<Timezone>>, // Can be mutated when read, so must be mutable by the reader, hence the `RwLock`.
tm_created: DateTime<Timezone>,
accesses: AtomicUsize, // Can be mutated when read, hence the atomic.
/// Hash of the memcache
///
/// Used to ensure integrity of written disk data on an explicit check
/// (implicitly, integrity is checked by comparing the length of the disk stream with the length of the memory stream, since they are write-only in partial entries.)
hash: sha256::Sha256Hash,
memory: Option<Memory>,
// Pathname is computed from `id`.
}
/// A byte-stream cache of data. Caches both to memory and also writes entries to disk.
#[derive(Debug)]
pub struct ByteCache<K: Key>
{
/// How the cache should operate.
/// This is `Arc`d so Partial entries can access it.
// Config is big, box it.
cfg: Arc<Config>,
/// Frozen entries.
entries: RwLock<HashMap<K, CacheEntry>>,
///// Non-complete entries are completed with async semantics.
///// Moving them to `entries` is completed with sync semantics.
// TODO: Is this right at all? eh...
// FUCK This shit, don't store it here, do it somewhere fucking else FUCK.
// working: tokio::sync::RwLock<Vec<Weak<AtomicCell<PartialCacheEntryInner<K>>>>>,
}
impl<K: Key> ByteCache<K>
{
/// Create a new empty entry for this cache
pub async fn create_partial(&self, key: K) -> Result<PartialCacheEntry<K>, error::PartialInitError>
{
let mut this = PartialCacheEntry::new_uninit(self, key);
this.init().await.map_err(error::PartialInitError)?;
Ok(this)
}
}
mod partial;
pub use partial::*;
pub mod error;
/*
XXX: Move this to submodule, fuck the Cell BULLSHIT FUCK
#[derive(Debug)]
struct PartialCacheEntryInner<K: Key>
{
key: K,
disk: tokio::fs::File,
memory: ::bytes::BytesMut,
}
//#[derive(Debug)]
pub struct PartialCacheEntry<K: Key>(Arc<AtomicCell<PartialCacheEntryInner<K>>>);
impl<K: Key> PartialCacheEntry<K>
{
fn as_inner_mut(&mut self) -> &mut PartialCacheEntryInner<K>
{
//XXX: Is this safe???? Do we need an extra unsafecell? eh... fuck this
unsafe { &mut *(self.0.as_ptr() as *mut _)}
}
fn as_inner(&self) -> &PartialCacheEntryInner<K>
{
unsafe { &*self.0.as_ptr() }
}
}
*/

@ -0,0 +1,98 @@
//! Inserting into the persistant cache.
use super::*;
use ::bytes::BytesMut;
use std::path::Path;
use tokio::fs::{self, File};
use std::io;
use tokio::io::AsyncWrite;
use cryptohelpers::sha2::{Sha256, Digest};
use mem::MemoryMut;
/// The write rule used for cache insertion operations.
/// This ensures the number of bytes returned corresponds to the number written into memory.
///
/// This is because we care more about the integrity of memcached data, because we can dump that to disk later if the integrity of the disk copy is incorrect.
#[derive(Debug)]
enum CacheWriteRule{}
impl plex::WriteRule for CacheWriteRule
{
#[inline(always)] fn compare_byte_sizes(a: usize, b: usize) -> Result<usize, Self::CompareFailedError> {
Ok(std::cmp::max(a,b))
}
}
/// A partially formed cache entry that can be mutated.
/// It has not yet been inserted into a persistant `ByteCache` cache, and is write-only.
pub struct PartialCacheEntry<K>
{
cfg: Arc<Config>,
id: Uuid,
key: K,
/// Written to with each write to any instance created by `writer()`.
/// Finalised only when freezing this to a completed entry.
hasher: Sha256,
file: Option<File>,
memory: MemoryMut,
}
/// The writer type for writing to a `PartialCacheEntry`.
pub type PartialCacheEntrySink<'a> = plex::MultiplexWrite<Box<dyn AsyncWrite + Send + Sync + Unpin + 'a>, Sha256Sink<&'a mut Sha256>>;
impl<K: Key> PartialCacheEntry<K>
{
#[inline(always)] pub(super) fn new_uninit(owner: &ByteCache<K>, key: K) -> Self
{
Self {
cfg: Arc::clone(&owner.cfg),
id: Uuid::new_v4(),
file: None,
memory: MemoryMut::new(),
hasher: Sha256::new(),
key,
}
}
#[inline(always)] pub(super) async fn init(&mut self) -> io::Result<()>
{
//self.memory.reserve(PAGE_SIZE);
if let Some(root) = &self.cfg.disk_location {
self.file = Some(fs::OpenOptions::new()
.write(true)
.open(gen_temp_path(root, &self.id)).await?);
}
Ok(())
}
/// Create a writer for this entry
pub fn writer(&mut self) -> PartialCacheEntrySink<'_>
{
let bx: Box<dyn AsyncWrite + Send + Sync + Unpin + '_> = if let Some(file) = self.file.as_mut()
{
Box::new((&mut self.memory).multiplex_ruled::<_, CacheWriteRule>(file))
} else {
Box::new(&mut self.memory)
};
bx.multiplex(Sha256Sink::new(&mut self.hasher))
}
}
/// Create a path for a **non-completed** entry
pub(super) fn gen_temp_path(root: impl AsRef<Path>, from: &Uuid) -> PathBuf
{
root.as_ref().join(format!("{}.open", from))
}
/// Create a path for a **completed** entry
pub(super) fn gen_path(root: impl AsRef<Path>, from: &Uuid) -> PathBuf
{
root.as_ref().join(format!("{}.entry", from))
}

@ -0,0 +1,114 @@
//! Commands for a service
use super::*;
use std::{any::Any, marker::{Send, Sync}};
use std::fmt;
use tokio::sync::{
oneshot,
};
id_type!(pub CommandID: "ID of a sent command");
bitflags! {
/// The requirements this command has
#[derive(Default)]
#[allow(non_snake_case)]
pub(super) struct CommandFlags: u64
{
/// Requires nothing
const NONE = 0;
/// Requires global service lock
const GLOBAL_LOCK = 1<<0;
}
}
/// A response from the service for a sent command
pub type Response = Box<dyn Any + Send + 'static>;
/// What kind of command to send to the service?
#[derive(Debug, PartialEq, Eq)]
pub enum CommandKind
{
}
//TODO: Add metadata map entry of `CommandFlags` for each disctiminant of `CommandKind` somehow. (maybe a proc-macro?)
/// A command to send to a running service.
///
/// Created when sending a `CommandKind` to a service. This is your handle for receiving the response.
#[derive(Debug)]
pub struct Command
{
id: CommandID,
flags: CommandFlags,
//kind: CommandKind, // `CommandKind` -(sent to)> <running service> -(handle returned from send func)> `Command`
resp: oneshot::Receiver<Option<Response>>,
}
impl fmt::Display for Command
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "CommandID<{}>", self.id.0.as_bytes().hex())
}
}
impl Command
{
/// The unique ID of this command
#[inline] pub fn id(&self) -> &CommandID
{
&self.id
}
/// Prevent any response from being sent to this `Command` handle from the service.
///
/// Indicates to the service we don't care about a response. This has the same effect as calling `ignore` but can be used in a builder-pattern way from a returned `Command` from a send.
/// Example: `let comm = service.send_command(comm_kind).ignored();`
#[inline(always)] pub fn ignored(mut self) -> Self
{
self.ignore();
self
}
/// Close the response channel, indicating to the service that we don't care about the response.
#[inline] pub fn ignore(&mut self)
{
self.resp.close();
}
/// Dispose of this `Command`, returning any `Response` that might've already been sent.
#[inline] pub fn close(mut self) -> Option<Response>
{
self.resp.close();
self.resp.try_recv().ok().flatten()
}
/// Poll for a response. If none has been sent yet, then return `self` so it can be polled again.
#[inline] pub fn try_into_response(mut self) -> Result<Option<Response>, Self>
{
use oneshot::error::TryRecvError;
match self.resp.try_recv() {
Ok(v) => Ok(v),
Err(TryRecvError::Closed) => Ok(None),
Err(TryRecvError::Empty) => Err(self),
}
}
/// Consume this instance into the response from the service.
#[inline] pub async fn into_repsonse(self) -> Option<Response>
{
self.resp.await.ok().flatten()
}
}
impl PartialEq for Command
{
#[inline] fn eq(&self, other: &Self) -> bool
{
self.id == other.id
}
}

@ -0,0 +1,56 @@
//! Configuration for services
use super::*;
use std::time::Duration;
use std::num::NonZeroUsize;
/// How long to wait before resetting the restart counter for `StopDirective::Restart`.
pub const SUPERVISOR_RESTART_TIME_LIMIT: Option<Duration> = Some(Duration::from_secs(5));
/// What the supervisor task should do when its background service unexpectedly exits
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Copy, Serialize, Deserialize)]
pub enum StopDirective
{
/// Ignore it, allow the service to exit
Ignore,
/// Restart the service, either infinitely, or up to this many times before exiting.
///
/// If the restart limit is exceeded, exit with error.
/// The limit is reset every `SUPERVISOR_RESTART_TIME_LIMIT` seconds (or never, if it is `None`.)
Restart(Option<NonZeroUsize>),
/// Panic the supervisor
Panic,
/// Exit with error
Error,
}
default!(StopDirective: Self::Error);
/// Settings for how a background service runs
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, Serialize, Deserialize)]
pub struct ServiceSettings
{
// Supervisor controls
/// What to do when the supervisor's child task exits unexpectedly
pub supervisor_stop_directive: StopDirective,
// Request dispatching options
/// How many requests to batch together
pub req_dispatch_chunk: NonZeroUsize,
/// How long to wait before forcefully processing an unfilled batch of requests
pub req_dispatch_force_timeout: Option<Duration>,
/// How long to wait before processing batches of requests
pub req_dispatch_delay: Option<Duration>,
/// Random **nanosecond** delay bounds between request batch processing
pub req_dispatch_jitter: Option<(u64, u64)>,
/// Filter requests in a block based on their lock flags, and process non-locking and locking commands concurrently in 2 seperate tasks.
/// This may result in commands being processed out-of-order sometimes.
///
/// If this is `false`, the lock will be acquired for each block that contains a locking command, then each command will be processed in order within the current task, instead of being filtered and dispatched to the lock-held and the non-lock-held task.
// For filtering, we can create a type that implements `From<Vec<T>>` and use it as the chunk collection type, maybe, possibly. Idk.
// Or we can just make a multiplexing mpsc wrapper i guess...
pub req_dispatch_internal_filter: bool, // Default: true
}
//TODO: impl Default for ServiceSettings

@ -0,0 +1,66 @@
//! The actual running task
use super::*;
use futures::{
prelude::*,
future::OptionFuture,
};
use std::time::Duration;
pub type SupervisorError = (); //TODO
pub type Error = (); // TODO
pub(super) fn spawn_supervisor(service: Service) -> JoinHandle<Result<(), SupervisorError>>
{
tokio::spawn(async move {
//TODO: Spawn slave and handle its exiting, restarting, etc according to config
Ok(())
})
}
/// Delay for a number of **nanoseconds** between the specified bounds
fn jitter_for(bounds: (u64, u64)) -> OptionFuture<impl Future<Output = ()> + 'static>
{
match bounds.jitter() {
0 => None.into(),
x => Some(tokio::time::delay_for(Duration::from_nanos(x))).into()
}
}
fn spawn_slave(service: Service) -> JoinHandle<Result<(), Error>>
{
let Service { inner: service, rx } = service;
tokio::spawn(async move {
let cfg = service.opt.as_ref();
let mut rx = rx
.chunk(cfg.req_dispatch_chunk.into())
.lag(cfg.req_dispatch_delay.unwrap_or(Duration::ZERO));
let mut timeout = cfg.req_dispatch_force_timeout.map(tokio::time::interval);
loop {
tokio::select! {
block = rx.next() => {
match block {
Some(block) => {
if let Some(bounds) = cfg.req_dispatch_jitter.and_then(|x| (x.0 + x.1 > 0).then(|| x)) {
// Jitter delay.
jitter_for(bounds).await;
}
// TODO: Filter and then/or process this block
},
None => {
// Reached the end of stream, exit gracefully.
break;
}
}
}
_ = OptionFuture::from(timeout.as_mut().map(|x| x.tick())), if timeout.is_some() => {
// Cause the `rx` to release a non-full chunk.
rx.get_mut().push_now();
}
}
}
Ok(())
})
}

@ -0,0 +1,101 @@
//! The actual running service
use super::*;
use std::sync::Arc;
use tokio::{
task::JoinHandle,
sync::{
RwLock,
Mutex,
mpsc,
oneshot,
}
};
pub mod command;
use command::{
CommandKind,
CommandFlags,
CommandID,
};
pub mod config;
mod builder;
pub use builder::*;
pub mod cache;
mod host;
/// Handle to a running service. Can be used to join it or create `Channel`s.
#[derive(Debug)]
pub struct Handle
{
task: JoinHandle<()>,
channel: Channel,
}
/// Inner portion of a `Channel`. Also held through `Arc` by the background service and its supervisor.
#[derive(Debug)]
struct ChannelInner
{
/// The settings for the service
// Boxed because this is a large structure.
opt: Box<config::ServiceSettings>,
}
/// The side of a `command::Command` that the running service sees.
#[derive(Debug)]
struct Request
{
/// ID that corresponds to the returned `command::Command` from the `send()` function of `Channel`.
id: CommandID,
/// The actual command sent by the user.
kind: CommandKind,
/// The metadata flags of this `CommandKind`.
///
/// This is looked up by the sender (user), not the receiver (service) to save service batch processing time.
/// Although the lookup should be extremely fast.
metadata: CommandFlags,
/// Optional response sender.
///
/// Just dropping this is the same as sending `None` as far as the user sees.
resp: oneshot::Sender<Option<command::Response>>,
}
/// Communicates with a running service
#[derive(Debug, Clone)]
pub struct Channel {
inner: Arc<ChannelInner>,
tx: mpsc::Sender<Request>,
}
/// The service's counterpart to `Channel`. Contains the metadata `ChannelInner` and the receiver for `Channel`s.
#[derive(Debug)]
struct Service
{
inner: Arc<ChannelInner>,
rx: mpsc::Receiver<Request>,
}
impl Eq for Channel{}
impl PartialEq for Channel {
#[inline] fn eq(&self, other: &Self) -> bool
{
Arc::ptr_eq(&self.inner, &other.inner)
}
}
/// Create a new service
pub fn create() -> ServiceBuilder
{
todo!()
}

@ -1,30 +0,0 @@
use super::*;
use std::cmp;
use std::ptr;
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices can overlap.
#[inline] pub fn move_bytes(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices must *not* overlap.
#[inline] pub fn copy_bytes(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy_nonoverlapping(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
Loading…
Cancel
Save