restarted. added ext methods. start service. start command

new
Avril 3 years ago
parent 503aff9997
commit 0ec606cf39
Signed by: flanchan
GPG Key ID: 284488987C31F630

142
Cargo.lock generated

@ -1,10 +1,12 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ad-hoc-iter"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5068a429476033d1940f21e21d317afae2fc3a82f412d5d8fe08c13f100a00e8"
checksum = "90a8dd76beceb5313687262230fcbaaf8d4e25c37541350cf0932e9adb8309c8"
[[package]]
name = "autocfg"
@ -18,6 +20,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
version = "1.2.1"
@ -69,6 +77,22 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20stream"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a91f983a237d46407e744f0b9c5d2866f018954de5879905d7af6bf06953aea"
dependencies = [
"base64 0.13.0",
"getrandom 0.2.2",
"openssl",
"pin-project",
"rustc_version",
"serde",
"smallvec",
"tokio",
]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
@ -96,13 +120,13 @@ dependencies = [
[[package]]
name = "cryptohelpers"
version = "1.7.0"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1758ba574c79ae6db3ccf6623cacc5293c2c0a14de871a7b95d4286861cbd504"
checksum = "33c34ac8437a348d0c23e71327d6d8affe4509cc91e33dd22c1e38f7c9da8070"
dependencies = [
"crc",
"futures",
"getrandom",
"getrandom 0.1.15",
"hex-literal",
"hmac",
"libc",
@ -288,9 +312,26 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6"
dependencies = [
"cfg-if 0.1.10",
"libc",
"wasi",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "half"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3"
[[package]]
name = "hermit-abi"
version = "0.1.17"
@ -470,16 +511,21 @@ dependencies = [
"ad-hoc-iter",
"bitflags",
"bytes 0.6.0",
"chacha20stream",
"cryptohelpers",
"futures",
"generational-arena",
"jemallocator",
"lazy_static",
"memmap",
"pin-project",
"rustc_version",
"serde",
"serde_cbor",
"serde_json",
"smallvec",
"tokio",
"uuid",
]
[[package]]
@ -517,23 +563,23 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.31"
version = "0.10.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187"
checksum = "6d7830286ad6a3973c0f1d9b73738f69c76b739301d0229c4b96501695cbe4c8"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types",
"lazy_static",
"libc",
"once_cell",
"openssl-sys",
]
[[package]]
name = "openssl-sys"
version = "0.9.59"
version = "0.9.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe"
checksum = "b6b0d6fb7d80f877617dfcb014e605e2b5ab2fb0afdf27935219bb6bd984cb98"
dependencies = [
"autocfg",
"cc",
@ -548,7 +594,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170d73bf11f39b4ce1809aabc95bf5c33564cdc16fc3200ddda17a5f6e5e48b"
dependencies = [
"base64",
"base64 0.12.3",
"crypto-mac",
"hmac",
"rand",
@ -559,18 +605,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.2"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2",
"quote",
@ -615,9 +661,9 @@ checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]]
name = "proc-macro2"
version = "1.0.24"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
checksum = "a152013215dca273577e18d2bf00fa862b89b24169fb78c4c95aeb07992c9cec"
dependencies = [
"unicode-xid",
]
@ -658,7 +704,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
"getrandom 0.1.15",
]
[[package]]
@ -670,12 +716,36 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.118"
@ -685,6 +755,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.118"
@ -737,9 +817,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.5.1"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
@ -760,9 +840,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "syn"
version = "1.0.54"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2af957a63d6bd42255c359c93d9bfdb97076bd3b820897ce55ffbfbf107f44"
checksum = "a1e8cdbefb79a9a5a65e0db8b47b723ee907b7c7f8496c76a1770b5c310bab82"
dependencies = [
"proc-macro2",
"quote",
@ -816,6 +896,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom 0.2.2",
"serde",
]
[[package]]
name = "vcpkg"
version = "0.2.11"
@ -834,6 +924,12 @@ version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "winapi"
version = "0.2.8"

@ -11,13 +11,20 @@ edition = "2018"
ad-hoc-iter = "0.2.2"
bitflags = "1.2.1"
bytes = "0.6.0"
chacha20stream = {version = "1.0", features = ["async", "serde"]}
cryptohelpers = {version = "1.7", features = ["full", "async", "serde"]}
futures = "0.3.8"
generational-arena = "0.2.8"
jemallocator = "0.3.2"
lazy_static = "1.4.0"
memmap = "0.7.0"
pin-project = "1.0.2"
serde = {version = "1.0.118", features= ["derive"]}
serde_cbor = "0.11.1"
serde_json = "1.0.60"
smallvec = "1.5.1"
tokio = {version = "0.2", features = ["full"]}
uuid = {version = "0.8", features = ["v4", "serde"]}
[build-dependencies]
rustc_version = "0.2"

@ -1,554 +0,0 @@
use super::*;
use std::marker::Unpin;
use std::{
io::{
self,
Read,
},
fs,
fmt,
};
use bytes::{
BytesMut,
BufMut,
};
use tokio::io::AsyncRead;
use tokio::{
task,
sync::{
mpsc,
oneshot,
},
};
use futures::prelude::*;
/// An open fd that has been memory mapped.
#[derive(Debug)]
pub struct OpenMMap
{
file: File,//note: this layout matters for destruction ordering.
map: Mmap,
}
impl AsRef<[u8]> for OpenMMap
{
#[inline(always)] fn as_ref(&self) -> &[u8]
{
&self.map[..]
}
}
impl OpenMMap
{
async fn new_file(file: tokio::fs::File) -> io::Result<Self>
{
let file = file.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_file_sync(file: File) -> io::Result<Self>
{
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
fn new_sync(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = fs::OpenOptions::new().read(true).open(file)?;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
async fn new(file: impl AsRef<Path>) -> io::Result<Self>
{
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
file,
map
})
}
}
/// How aggressively should we cache a specific item.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
#[repr(u32)]
pub enum Level
{
/// No cacheing
///
/// # Usage
/// Best used for cold files, or files that are not accessed much, or when running low on memory and/or fds.
///
/// Corresponds to `DataCacheState::None`
None,
/// Open the file and cache the FD as std `File`
/// This can avoid the syscall overhead of needing to open the file next time it is accessed.
///
/// # Usage
/// Best used for frequently acessed files.
///
/// Corresponds to `DataCacheState::Open`
Low,
/// Open the file, cache the FD *and* map the file in memory.
/// This can provide efficient random-access of the file without the need to preload it.
///
/// # Usage
/// Best used for frequently read large files.
///
/// Corresponds to `DataCacheState::Mapped`
High,
/// Load the whole contents of the file into memory, without keeping it open.
/// This provides the most efficient acesss of the file, as well as not contributing to the process' fd limit, but at the cost of:
/// * Loading the whole file into a memory buffer, which may be a slow operation and/or take up massive memory space
/// * Allowing a potential desync if the file is changed on disk while the cache buffer is loaded.
///
/// ## Desync
/// While `mtfse` already assumes it has exclusive access to all files in its db root, generally a file being modified by an external program will cause an error to be returned within a `mtfse` operation eventually as file hashes are updated and/or files are encrypted/decrypted or even read (in the case of a file being deleted.)
///
/// When an item is cached at this level however, any of these kinds of changes made will not be visible. The store can then be put into an invalid state without realising it.
///
/// ## Tradeoffs
/// The caching operation itself is expensive, the memory requirements is expensive, and in some cases this can even *slow* reads compared to the other levels when used on large files, as we cannot take advantage of the kernel's internal file data caching for mapped random-acesss reads and we are bound to causing CPU cache misses.
///
/// # Usage
/// Best used for frequently read small files.
///
/// Corresponds to `DataCacheState::Memory`
Extreme,
}
/// Provides immutable caching of a file in a data entry.
#[derive(Debug)]
pub enum DataCacheState
{
/// There is no file cache for this item.
None,
/// The file is open, we have an fd.
Open(File),
/// The file is open and memory mapped.
Mapped(OpenMMap),
/// The file is not open, but its whole contents have been loaded into memory.
Memory(Bytes), // load from file via `BytesMut` buffer, then call `.freeze()`.
}
impl Default for DataCacheState
{
#[inline]
fn default() -> Self
{
Self::None
}
}
impl DataCacheState
{
/// Spawn a task to generate a cache for the file provided by `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background(from: impl Into<PathBuf>, level: Level) -> impl Future<Output = io::Result<Self>> + 'static//tokio::task::JoinHandle<io::Result<Self>>
{
let from = from.into();
async move {
if from.exists() && from.is_file() {
tokio::spawn(Self::new(from, level)).await.expect("Background loader panicked")
} else {
Err(io_err!(NotFound, "Path either not existant or not a file."))
}
}
}
/// Spawn a task to generate a cache for the already opened file `from`.
///
/// # Notes
/// This is redundant for all cache levels except `Extreme`.
pub fn background_from_file(from: tokio::fs::File, level: Level) -> impl Future<Output = io::Result<Self>> + 'static // tokio::task::JoinHandle<io::Result<Self>>
{
async move {
let file = from.into_std().await;
tokio::spawn(Self::new_fd(file, level)).await.expect("Background loader panicked")
}
}
}
impl DataCacheState
{
/// Attempt to upgrade the cache.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub async fn into_upgrade(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => x,
})
}
/// Attempt to upgrade the cache, blocking the current thread.
///
/// # Returns
/// The upgraded cache item, or itself if it is unable to be upgraded (`None` and `Memory`).
pub fn into_upgrade_sync(self) -> io::Result<Self>
{
Ok(match self {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => x,
})
}
/// Attempt to upgrade the cache in-place.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub async fn upgrade(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
tokio::task::spawn_blocking(move || {
map.as_ref().copy_to_bytes(map.as_ref().len())
}).await.expect("Copying map into memory failed")
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Attempt to upgrade the cache in-place, blocking the current thread.
///
/// # Panics
/// If `self` is `None` or `Memory`.
pub fn upgrade_sync(&mut self) -> io::Result<()>
{
*self = match std::mem::replace(self, Self::None) {
Self::Open(file) => Self::Mapped(OpenMMap::new_file_sync(file)?),
Self::Mapped(map) => Self::Memory({
use bytes::Buf;
map.as_ref().copy_to_bytes(map.as_ref().len())
}),
x => panic!("Cannot upgrade from {:?}", x),
};
Ok(())
}
/// Read from the cache at `offset` into the provided buffer, and return the number of bytes read.
///
/// # Performance
///
/// When the cache has random access, this method completes without yielding. If not, it performs async seek & read operations to fill the buffer as much as possible from the offset.
///
/// # Returns
/// If `EOF` is encountered within the read, then it is terminated early and the number of bytes successfully read is returned (and will be less than the length of the buffer), otherwise, the full buffer was filled, and the full buffer's length will be returned.
///
/// ## Errors
///
/// If this cache is not active, it will return an `io::Error` with `io::ErrorKind::NotConnected`.
/// Any other error in I/O operations is propagated.
pub async fn read_at(&mut self, offset: usize, into: &mut [u8]) -> io::Result<usize> // this takes `&mut self` only to ensure it cannot be called on different threads at the same time, as any file operations need to be atomic.
{
if let Some(ar) = self.random_access()
{
return Ok(slice::copy_bytes(&ar[offset..], into));
}
if let Some(file) = self.file()
{
use tokio::{
fs, prelude::*,
};
let mut file = fs::File::from_std(file.try_clone()?); // this is what requires we take `&mut(ex) self`.
file.seek(io::SeekFrom::Start(u64::try_from(offset).expect("Arch size integer was out of bounds of u64 (this should never happen)"))).await?;
let mut read =0;
let mut cur;
while {cur =file.read(&mut into[read..]).await?; cur != 0 && read<into.len()} {
read+=cur;
}
return Ok(read);
}
Err(io_err!(NotConnected, "Operation not supported (no cache is available)"))
}
/// Attempt to get a reference to an fd
pub fn file(&self) -> Option<&File>
{
match self
{
Self::Mapped(map) => Some(&map.file),
Self::Open(file) => Some(file),
_ => None,
}
}
/// Attempt to get a random access buffer of this cache, if one exists.
pub fn random_access(&self) -> Option<& [u8]>
{
match self {
Self::Mapped(map) => Some(map.as_ref()),
Self::Memory(mem) => Some(&mem[..]),
_ => None,
}
}
/// Drop the whole cache (if there is one).
#[inline(never)] pub fn clear(&mut self)
{
*self = Self::None;
}
/// Attempt to asynchronously create a cache state for file provided by already loaded `file` at this level.
pub async fn new_fd(file: File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file(file.into()).await?),
Level::Extreme => {
let file = tokio::fs::File::from(file);
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to asynchronously create a cache state for file provided by `file` at this level.
pub async fn new(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(tokio::fs::OpenOptions::new().read(true).open(file).await?.into_std().await),
Level::High => Self::Mapped(OpenMMap::new(file).await?),
Level::Extreme => {
let file = tokio::fs::OpenOptions::new().read(true).open(file).await?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().await.ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer(file, &mut bytes).await?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_sync(file: impl AsRef<Path>, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(fs::OpenOptions::new().read(true).open(file)?),
Level::High => Self::Mapped(OpenMMap::new_sync(file)?),
Level::Extreme => {
let file = fs::OpenOptions::new().read(true).open(file)?;
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
/// Attempt to synchronously create a cache state for file provided by already-loaded `file` at this level.
///
/// # Note
/// This will block until all the I/O operations and syscalls have completed. In an async context avoid using it.
pub fn new_fd_sync(file: fs::File, level: Level) -> io::Result<Self>
{
Ok(match level {
Level::None => Self::None,
Level::Low => Self::Open(file),
Level::High => Self::Mapped(OpenMMap::new_file_sync(file)?),
Level::Extreme => {
let (mut bytes,expect) = {
if let Some(len) = file.metadata().ok().map(|m| usize::try_from(m.len()).ok()).flatten() {
(BytesMut::with_capacity(len), Some(len))
} else {
(BytesMut::new(), None)
}
};
match (expect, read_whole_into_buffer_sync(file, &mut bytes)?) {
(Some(expect), len) if len != expect => return Err(io_err!(UnexpectedEof, "Size mismatch")),
_ => Self::Memory(bytes.freeze()),
}
},
})
}
}
const BUFSIZE: usize = 4096;
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Does not block the current task.
async fn read_whole_into_buffer<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: AsyncRead + Unpin,
W: BufMut + ?Sized,
{
use tokio::prelude::*;
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..]).await? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// Read as many bytes from `input` into the `BufMut` output as possible and then return the number of bytes read.
/// Blocks the current thread.
fn read_whole_into_buffer_sync<R,W>(mut input: R, mut output: &mut W) -> io::Result<usize>
where R: Read,
W: BufMut + ?Sized,
{
let mut buf = [0u8; BUFSIZE];
let mut whole=0;
Ok(loop {
let read = match input.read(&mut buf[..])? {
0 => break whole,
x => (whole += x, x).1,
};
(&mut output).put(&buf[..read]);
})
}
/// A request to send to a background worker spawned by a `service` function, for each instance of this there will be a corresponding `CacheResponse` you can `await` to receive the value.
#[derive(Debug)]
pub struct CacheRequest(tokio::fs::File, Level, oneshot::Sender<io::Result<DataCacheState>>);
/// A handle to a pending `DataCacheState` being constructed from a `CacheRequest` sent to a background worker from a `service` function.
///
/// `await`ing this response will yield until the worker task has completed and can send the cache state back, upon which it yields a `io::Result<DataCacheState>`.
#[derive(Debug)]
pub struct CacheResponse(oneshot::Receiver<io::Result<DataCacheState>>);
//TODO: impl Future for CacheResponse ...
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
#[inline] pub fn service() -> (impl Future<Output = ()> + 'static, mpsc::Sender<CacheRequest>)
{
use std::{
task::{Poll, Context},
pin::Pin,
};
/// A future that never completes and never reschedules itself.
struct ShimNever;
impl Future for ShimNever
{
type Output = !;
#[inline(always)] fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output>
{
Poll::Pending
}
}
let (rt, tx) = service_with_shutdown(ShimNever);
(rt.map(|_| ()), tx)
}
/// Start a background task service that takes `CacheRequest`s and produces `DataCacheState`s asynchronously.
///
/// The service can take a `Future` which, when completes, will drop the service task and stop replying to responses.
/// The service also shuts down when all producers of requests are dropped.
///
/// # Returns
/// A `Future` which, when `await`ed on, will yield until the service shuts down and a handle to send `CacheRequest`s to the service.
/// The output of the future is how the service was terminated, by the `cancel` future completing, or by all producers being dropped.
pub fn service_with_shutdown(cancel: impl Future + 'static) -> (impl Future<Output = ServiceResult> + 'static, mpsc::Sender<CacheRequest>)
{
let (tx, mut rx) = mpsc::channel(32);
(async move {
let ren = async {
while let Some(CacheRequest(file, level, send_to)) = rx.recv().await {
tokio::spawn(async move {
let _ = send_to.send(DataCacheState::new_fd(file.into_std().await, level).await);
});
}
};
tokio::select! {
_ = ren => {
// all senders dropped
ServiceResult::NoProducers
}
_ = cancel => {
// explicit cancel
rx.close();
ServiceResult::Cancelled
}
}
}, tx)
}
/// The reason a `service` task has terminated.
#[derive(Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
#[non_exhaustive]
pub enum ServiceResult
{
/// All mpsc senders had been dropped, there were no more possible requests to respond to.
NoProducers,
/// The service was explicitly cancelled by a shutdown future.
Cancelled,
}
impl fmt::Display for ServiceResult
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::NoProducers => write!(f, "All mpsc senders had been dropped, there were no more possible requests to respond to."),
Self::Cancelled => write!(f, "The service was explicitly cancelled by a shutdown future."),
}
}
}

@ -1,50 +0,0 @@
use super::*;
bitflags!{
/// What to remove when using `Store::clean()`.
pub struct StoreCleanFlags: u32
{
/// Dead entry hash (key) mappings.
const MAP = 1 << 0;
/// Dead `tag -> entry hash` mappings.
const TAG_MAP = 1 << 1;
/// Dead tags (i.e. tags with no entries).
const TAGS = 1 << 2;
/// Dead entries (i.e. Inaccessable, corrupt, or missing file entries).
const ENTRY = 1 << 3;
/// Clean all
const ALL = (1 << 4)-1;
}
}
impl Default for StoreCleanFlags
{
#[inline]
fn default() -> Self
{
Self::MAP | Self::TAG_MAP | Self::TAGS
}
}
// Refresh and cleaning
impl Store
{
/// Remove any and all dead mappings / entries specified by the flags here.
///
/// Pass `Default::default()` to only clean *mappings*, and not dead entries. Otherwise, `StoreCleanFlags::ALL` will perform a full audit.
/// See [`StoreCleanFlags`] for other options.
pub fn clean(&mut self, what: StoreCleanFlags)
{
todo!("What order do we do `what`'s things in?")
}
/// Force a full rebuild of all mappings.
///
/// The clears the entire store except entries (and entry specific caches), and then rebuilds all the internal mappings from scratch. Any cached mappings (e.g empty tag reserves) are removed.
/// This does not remove invalid entries themselves, for that use `clean(StoreCleanFlags::ENTRY)`.
pub fn rebuild(&mut self)
{
todo!()
}
}

@ -1,139 +0,0 @@
use super::*;
use std::hash::{Hash, Hasher};
use std::borrow::Borrow;
pub mod builder;
#[derive(Debug, Serialize, Deserialize)] // Clone, PartialEq, Eq, Hash
pub struct Entry
{
name: String,
description: String,
/// The key used to encrypt the file, if any.
key: Option<aes::AesKey>,
/// The hash of the *unencrypted* data.
hash: sha256::Sha256Hash,
/// The tags for this entry.
///
/// # Note
/// This should be updated the same as the `Store`'s `tags` and `tag_mappings`. The duplication here is for lookup convenience / caching.
pub(super) tags: Vec<String>,
/// The *original* filename of the item, *not* the path to the real file for this item (see `location`).
filename: OsString,
/// Filename *relative* to the root of the store
location: PathBuf,
/// The state of caching this entry is currently in. This should not be saved as it has no meaning regarding the actual data itself.
#[serde(skip)]
cache: DataCacheState,
}
// Accessors
impl Entry
{
/// The name of this entry
pub fn name(&self) -> &str
{
&self.name
}
pub fn description(&self) -> &str
{
&self.description
}
/// The *original* filename of this entry
pub fn filename(&self) -> &Path
{
Path::new(&self.filename)
}
/// The path of this entry's file relative to the root of its store
pub fn location(&self) -> &Path
{
self.location.as_path()
}
/// The tags for this entry
pub fn tags(&self) -> &[String]
{
&self.tags[..]
}
/// Is the file of this entry encrypted?
pub fn is_encrypted(&self) -> bool
{
self.key.is_some()
}
/// The sha256 hash of the data in this entry
pub fn hash(&self) -> &sha256::Sha256Hash
{
&self.hash
}
}
impl Entry
{
/// Consume and drop the cache. Only that useful as shim for mapping.
#[inline(always)] pub(super) fn with_no_cache(self) -> Self
{
Self {
cache: Default::default(),
..self
}
}
/// Clone the data needed for a refresh of this entry in a store before it is mutated.
#[inline(always)] pub(super) fn prepare_for_refresh(&self) -> (EntryKey, Box<[String]>)
{
(self.hash().clone(), self.tags.clone().into_boxed_slice())
}
}
impl Borrow<sha256::Sha256Hash> for Entry
{
#[inline] fn borrow(&self) -> &sha256::Sha256Hash
{
&self.hash
}
}
impl Hash for Entry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.hash.hash(state)
}
}
impl PartialEq for Entry
{
fn eq(&self, other: &Self) -> bool
{
macro_rules! eq {
($one:ident) => (self.$one == other.$one);
($($many:ident),*) => {
$(
eq!($many)
)&& *
};
}
eq!(name, description, key, tags, filename, location)
}
}
impl Eq for Entry{}
impl Clone for Entry
{
fn clone(&self) -> Self {
macro_rules! clone {
($($many:ident),*) => {
Self {
cache: Default::default(),
$($many: self.$many.clone()),*
}
}
}
clone!(name, description, key, tags, filename, location, hash)
}
}

@ -1,107 +0,0 @@
use super::*;
use std::{
error,
fmt,
};
use std::ffi::OsStr;
/// Builder for creating [`Entry`] instances.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct EntryBuilder<'a>
{
name: Option<&'a str>,
desc: Option<&'a str>,
tags: Option<&'a[&'a str]>,
key: Option<aes::AesKey>,
hash: Option<sha256::Sha256Hash>,
location: Option<&'a Path>,
filename: Option<&'a OsStr>,
}
macro_rules! build_fn {
($name:ident: $ty:ty $(, $com:literal)?) => {
$(#[doc=$com])?
#[inline] pub const fn $name(self, $name: $ty) -> Self
{
Self {
$name: Some($name),
..self
}
}
};
}
impl<'a> EntryBuilder<'a>
{
/// Create a new entry builder
#[inline] pub const fn new() -> Self
{
macro_rules! default {
($($name:ident),*) => {
Self {
$(
$name: None
),*
}
}
}
default!{
name, desc, tags, key, hash, location, filename
}
}
build_fn!(name: &'a str, "Insert the name of this entry");
build_fn!(desc: &'a str, "Insert a description for this entry");
build_fn!(tags: &'a [&'a str], "Insert the tags for this entry");
build_fn!(key: aes::AesKey, "Insert the encryption key for this entry");
build_fn!(hash: sha256::Sha256Hash, "Insert the hash for this entry");
build_fn!(location: &'a Path, "Insert the location for this entry");
build_fn!(filename: &'a OsStr, "Insert the original filename for this entry");
/// Try to build an `Entry` from this builder.
#[inline] pub fn build(self) -> Result<Entry, EntryBuildError>
{
macro_rules! fail_on {
($name:ident) => {
self.$name.ok_or(EntryBuildError)?
};
($name:ident: $or:expr) =>{
if let Some(name) = self.$name {
name
} else {
$or
}
};
}
Ok(
Entry {
name: fail_on!(name).into(),
description: fail_on!(desc: "").into(),
tags: fail_on!(tags: &[]).iter().map(|&x| x.to_owned()).collect(),
key: self.key,
hash: fail_on!(hash),
location: fail_on!(location).into(),
filename: fail_on!(filename: OsStr::new("")).into(),
cache: Default::default(),
}
)
}
}
/// Error for when an improporly configured [`EntryBuilder`] called `.build()`.
/// Usually this is because of missing fields.
#[derive(Debug)]
pub struct EntryBuildError;
impl error::Error for EntryBuildError{}
impl fmt::Display for EntryBuildError
{
#[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "improporly configured builder")
}
}

@ -1,98 +0,0 @@
use super::*;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Freeze
{
metadata: StoreMetadata,
data: Vec<Entry>,
}
impl Freeze
{
pub(super) fn new_ref(from: &Store) -> Self
{
Self {
metadata: from.metadata.clone(),
data: from.data.iter().map(Entry::clone).collect(),
}
}
pub(super) fn new_moved(from: Store) -> Self
{
Self {
metadata: from.metadata,
data: from.data.into_iter().map(Entry::with_no_cache).collect(),
}
}
pub(super) fn create_new(&self) -> Store
{
let mut new = Store::with_capacity(self.metadata.clone(), self.data.len());
for entry in self.data.iter()
{
let hash = entry.hash().clone();
let hash_idx = new.data_hashes.insert(hash);
// insert the tags
for tag in entry.tags.iter()
{
if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti);
}
}
new.data.insert(entry.clone());
}
new
}
pub(super) fn into_new(self) -> Store
{
let Freeze { data, metadata } = self;
let mut new = Store::with_capacity(metadata, data.len());
for entry in data.into_iter()
{
let hash = entry.hash().clone();
let hash_idx = new.data_hashes.insert(hash);
// insert the tags
for tag in entry.tags.iter()
{
if let Some(&ti) = new.tags.get(tag) {
// This tag has an entry already, append to it
new.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = new.tag_mappings.insert(iter![hash_idx].collect());
new.tags.insert(tag.clone(), ti);
}
}
new.data.insert(entry);
}
new
}
}
impl From<Store> for Freeze
{
#[inline] fn from(from: Store) -> Self
{
Self::new_moved(from)
}
}
impl From<Freeze> for Store
{
#[inline] fn from(from: Freeze) -> Self
{
from.into_new()
}
}

@ -1,15 +0,0 @@
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StoreMetadata
{
pub name: String,
pub root: PathBuf,
}
impl StoreMetadata
{
}

@ -1,179 +0,0 @@
use super::*;
use generational_arena::{
Arena, Index as ArenaIndex,
};
use std::convert::{TryFrom, TryInto};
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::fs::File;
use std::ffi::OsString;
use memmap::Mmap;
use bytes::Bytes;
use cryptohelpers::{aes, sha256};
mod entry;
pub use entry::Entry;
pub use entry::builder::EntryBuilder;
mod cache;
pub use cache::DataCacheState;
mod metadata;
pub use metadata::StoreMetadata;
mod freeze;
pub use freeze::Freeze;
mod search;
pub use search::*;
mod mutation;
pub use mutation::*;
mod clean;
pub use clean::*;
#[cfg(test)] mod test;
/// The key used to look up a single entry in `O(1)` time.
///
/// # Notes
/// If you change this, make sure to change the `BuildHasher` back to `RandomState`.
pub type EntryKey = sha256::Sha256Hash;
/// The hasher used for the entry data set.
///
/// # Notes
/// Change this back to `RandomState` if you change the type of `EntryKey`.
pub type BuildHasher = std::collections::hash_map::RandomState;//Sha256TopBuildHasher; <-- bug here
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Default)]
struct PurgeTrack
{
/// Number of removals since last purge
num_removals: usize,
/// Max number of removals before an arena index purge.
///
/// If set to 0, purge will be performed on every removal of a tag.
num_removals_max: usize,
}
impl PurgeTrack
{
/// Create a new purge tracker with this purge threshold.
#[inline] pub fn new(max: usize) -> Self
{
Self {
num_removals: 0,
num_removals_max: max,
}
}
/// Should we purge now?
#[inline] pub fn should_purge(&self) -> bool
{
self.num_removals >= self.num_removals_max
}
}
#[derive(Debug)]
pub struct Store
{
metadata: StoreMetadata,
purge_track: PurgeTrack,
data: HashSet<Entry, BuildHasher>, // The entry sha256 hash is used as the `key` here, as `Entry` both hasshes to, and `Borrow`s to `Sha256Hash`.
data_hashes: Arena<EntryKey>, // used to lookup in `data`.
tag_mappings: Arena<HashSet<ArenaIndex>>,
tags: BTreeMap<String, ArenaIndex>, // string (tags) -> index (tag_mappings) -> index (data_hashes) -> hash used for lookup (data)
}
// Creating
impl Store
{
/// Create a new empty store with this metadata.
///
/// # Panics
/// If the root directory specified in `metadata` does not exist or is not a directory.
pub fn new(metadata: StoreMetadata) -> Self
{
assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `new` not existant or not a directory", metadata.root);
Self {
metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_hasher(Default::default()),
data_hashes: Arena::new(),
tag_mappings: Arena::new(),
tags: BTreeMap::new(),
}
}
/// Create a new empty store with this metadata and initial storage capacity
///
/// # Panics
/// If the root directory specified in `metadata` does not exist or is not a directory.
pub fn with_capacity(metadata: StoreMetadata, cap: usize) -> Self
{
assert!(metadata.root.exists() && metadata.root.is_dir(), "Metadata root {:?} passed to `with_capacity` not existant or not a directory", metadata.root);
Self {
metadata,
purge_track: PurgeTrack::new(16),
data: HashSet::with_capacity_and_hasher(cap, Default::default()),
data_hashes: Arena::with_capacity(cap),
tag_mappings: Arena::with_capacity(cap),
tags: BTreeMap::new(),
}
}
}
// Freezing
impl Store
{
/// Create a snapshot of this store, cloning all data into a frozen and serialisable version of it.
/// # Notes
/// This method clones the entire store into the new `Freeze`. To avoid this, use `into_freeze` if the store is no longer used after the freeze.
#[inline] pub fn freeze(&self) -> Freeze
{
Freeze::new_ref(self)
}
/// Consume into a snapshot of this store, moving all data into a frozen and serializable version of it.
#[inline] pub fn into_freeze(self) -> Freeze
{
Freeze::new_moved(self)
}
/// Create a new store instance by cloning from a frozen snapshot of it.
/// # Notes
/// This method clones the entire `Freeze` into the new store. To avoid this, use `from_freeze` if the snapshot is no longer used after the unfreeze.
#[inline] pub fn unfreeze(freeze: &Freeze) -> Self
{
freeze.create_new()
}
/// Consume a store snapshot and move its entries into a new store.
#[inline] pub fn from_freeze(freeze: Freeze) -> Self
{
freeze.into_new()
}
}
// Primitive access
impl Store
{
/// Look up a single entry in `O(1)` time with its key.
#[inline] pub fn get(&self, key: &EntryKey) -> Option<&Entry>
{
self.data.get(key)
}
}

@ -1,184 +0,0 @@
//! Handling store mutation
use super::*;
use std::borrow::Borrow;
impl Store
{
/// Insert this entry into the data table, overwriting any identically hashed one and returning it.
pub fn insert_overwrite(&mut self, ent: Entry) -> Option<Entry>
{
let old = self.remove(ent.hash());
let hash_idx = self.data_hashes.insert(*ent.hash());
for tag in ent.tags.iter() {
self.insert_tag_for_idx(tag, hash_idx);
}
self.data.insert(ent);
old
}
/// Insert this entry then return a reference to it.
pub fn insert<'a, 'b>(&'a mut self, ent: Entry) -> &'b Entry
where 'a: 'b
{
let ffd = *ent.hash();
self.insert_overwrite(ent);
self.data.get(&ffd).unwrap()
}
/// Mutate this entry in place if it exists.
///
/// See [`map_entry`].
pub fn mutate_entry_in_place<T, F>(&mut self, ent_id: &EntryKey, f: F) -> Option<T>
where F: FnOnce(&mut Entry) -> T
{
if let Some(mut ent) = self.data.take(ent_id) {
let update = ent.prepare_for_refresh();
let out = f(&mut ent);
let new = ent;
self.refresh_for_entry(update, &new);
self.data.insert(new);
Some(out)
} else {
None
}
}
/// Update an entry that may have been modified.
///
/// The entries old hash and tags are passed, and it updates the store to reflect the new entry mutation.
/// You must still insert the new entry after this.
fn refresh_for_entry(&mut self, (ohash, otags): (impl Borrow<EntryKey>, impl AsRef<[String]>), new: &Entry)
{
let ohash = ohash.borrow();
let iidx = if new.hash() != ohash {
// We need to update `data_hashes`.
for (_, hash) in self.data_hashes.iter_mut()
{
if hash == ohash {
*hash = *new.hash();
break;
}
}
self.reverse_index_lookup(new.hash()).unwrap()
} else {
self.reverse_index_lookup(ohash).unwrap()
};
let otags =otags.as_ref();
if &new.tags[..] != &otags[..] {
// We need to update tag mappings
let ntags: HashSet<_> = new.tags.iter().collect();
let otags: HashSet<_> = otags.iter().collect();
// Find the ones that were removed and added in parallel.
for (t, u) in ntags.iter().zip(otags.iter())
{
if !otags.contains(t) {
// It was added
self.insert_tag_for_idx(t, iidx);
}
if !ntags.contains(u) {
// It was removed
self.remove_tag_for_idx(t, iidx);
}
}
}
}
/// Map the entry with this function, updating references to it if needed.
///
/// If the hash of the entry if modified by this map, then the hashes indecies are updated to the new hash.
pub fn map_entry<F>(&mut self, ent_id: &EntryKey, f: F)
where F: FnOnce(Entry) -> Entry
{
if let Some(ent) = self.data.take(ent_id) {
let update = ent.prepare_for_refresh();
let new = f(ent);
self.refresh_for_entry(update, &new);
self.data.insert(new);
}
}
/// Remove this entry, and return it, if it was set.
pub fn remove(&mut self, key: &EntryKey) -> Option<Entry>
{
if let Some(entry) = self.data.take(key) {
Some(self.cleanup_remove_entry(entry.with_no_cache()))
} else {
None
}
}
/// Preform cleanup on an entry *already removed* from `data`.
fn cleanup_remove_entry(&mut self, ent: Entry) -> Entry
{
// Remove any unused tags
if let Some(hash_idx) = self.reverse_index_lookup(ent.hash()) {
for tag in ent.tags.iter()
{
self.remove_tag_for_idx(tag, hash_idx);
}
}
// Remove from data hashes can be deferred
self.purge_if_needed();
ent
}
/// Remove dead mappings from `data_hashes` to `data`.
#[inline] fn purge_data_hash_mappings(&mut self)
{
let data = &self.data;
self.data_hashes.retain(move |_, hash| data.get(hash).is_some());
}
/// Purge the arena mapping if threshold of dead entries is reached, otherwise defer it.
#[inline] fn purge_if_needed(&mut self)
{
if self.purge_track.should_purge() {
self.purge_data_hash_mappings();
self.purge_track.num_removals = 0;
} else {
self.purge_track.num_removals += 1;
}
}
}
// Tag specific stuff
impl Store
{
/// Remove a mapping for this tag string to this specific hash index, cleaning up the tag mappings if needed.
#[inline] fn remove_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
match self.tag_mappings.get_mut(ti).map(|x| {x.remove(&hash_idx); x.len()}) {
Some(0) => no_op!(self.tag_mappings.remove(ti)), // there is only 1 mapping, remove it and then remove the tag (TODO: Should we keep the tag in the btree as cache? TODO: Add this to `PurgeTrack`)
None => (), // there is no mapping, just remove the tag
_ => return, //don't remove the tag, there's other references in the mapping
}
self.tags.remove(tag);
}
}
/// Insert a mapping for this tag string to this single hash index, creating it if needed
#[inline] fn insert_tag_for_idx(&mut self, tag: impl AsRef<str>, hash_idx: ArenaIndex)
{
let tag = tag.as_ref();
if let Some(&ti) = self.tags.get(tag) {
// This tag has an entry already, append to it
self.tag_mappings.get_mut(ti).unwrap().insert(hash_idx);
} else {
// This tag has no entry, create it
let ti = self.tag_mappings.insert(iter![hash_idx].collect());
self.tags.insert(tag.to_owned(), ti);
}
}
}

@ -1,216 +0,0 @@
use super::*;
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::iter;
use std::collections::{
btree_map::Range,
VecDeque,
};
use smallvec::SmallVec;
use std::sync::Arc;
use tokio::{
sync::{
mpsc,
},
};
use futures::prelude::*;
const STACK_CACHE_SIZE: usize = 8;
/// An iterator over entries in a store matching *any* tags
#[derive(Debug)]
pub struct StoreSearchAnyIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a sha256::Sha256Hash>, PhantomData<T>);
/// An iterator over entries in a store matching *all* tags
#[derive(Debug)]
pub struct StoreSearchAllIter<'a, T: ?Sized>(&'a Store, Option<Range<'a, String, ArenaIndex>>, VecDeque<&'a Entry>, SmallVec<[&'a T; STACK_CACHE_SIZE]>);
// Searching by tags
impl Store
{
/// Lookup tag indecies for this iterator of tags
pub(super) fn tag_index_lookup<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> impl Iterator<Item = (& str, ArenaIndex)> + '_
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => Some(self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))).map(|(s, &i)| (s.as_str(), i))),
_ => None
}.map_into_iter()
}
/// Find the `data_hashes` index for this entry in `data`.
pub(super) fn reverse_index_lookup(&self, hs: &EntryKey) -> Option<ArenaIndex>
{
self.data_hashes.iter().filter_map(|(i, h)| if h == hs {
Some(i)
} else {
None
}).next()
}
/// Create a `Stream` that searches for all entries with *all* of these provided tags on background tasks.
///
/// The stream outputs `EntryKey`s, which can be used to look up the `Entry` in the store in `O(1)` time.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
///
/// # Panics
/// It is not guaranteed that the search will complete before the stream ends, if one of the background tasks panics; although this is unlikely.
/// If one of the tasks panics, it is ignored.
///
/// This is only useful in a multithreaded environment where tasks can be scheduled on different threads, otherwise use `tag_search_all`
pub fn tag_search_all_detached<U: Into<String>>(self: Arc<Self>, tags: impl IntoIterator<Item=U>) -> impl Stream<Item = EntryKey> + 'static
{
let (tx, rx) = mpsc::channel(16);
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().map(|x| x.into()).collect();
sorted.sort();
tokio::spawn(async move {
let range = match (sorted.first().map(|x| x.as_ref()), sorted.last().map(|x| x.as_ref())) {
(Some(low), Some(high)) => self.tags.range::<str, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return,
};
future::join_all(range.map(|(_, &ti)| {
let store = Arc::clone(&self);
let mut tx = tx.clone();
tokio::spawn(async move {
let data_hashes = &store.data_hashes;
for x in store.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
{
if tx.send(*x).await.is_err() {
return;
}
}
})
})).await;
});
rx
}
/// Search for all entries with *all* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_all<'a, T: ?Sized + Ord + 'a>(&'a self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAllIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAllIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAllIter(self, None, Default::default(), sorted),
}), VecDeque::new(), sorted)
}
/// Search for all entries with *any* of these provided tags.
///
/// # Notes
/// This is allowed to produce duplicate entries, if either:
/// * An entry has multiple of the same tag set
/// * An entry has multiple of the tags provided to this function set
pub fn tag_search_any<'a, T: ?Sized + Ord + 'a>(&self, tags: impl IntoIterator<Item= &'a T>) -> StoreSearchAnyIter<'_, T>
where String: Borrow<T>
{
let mut sorted: SmallVec<[_; STACK_CACHE_SIZE]> = tags.into_iter().collect();
sorted.sort();
StoreSearchAnyIter(self, Some(match (sorted.first(), sorted.last()) {
(Some(&low), Some(&high)) => self.tags.range::<T, _>((std::ops::Bound::Included(low), std::ops::Bound::Included(high))),
_ => return StoreSearchAnyIter(self, None, Default::default(), PhantomData),
}), VecDeque::new(), PhantomData)
}
/// Search for all items with this provided tag.
///
/// # Notes
/// This is allowed to produce duplicate entries, if an entry has two of the same tags set.
pub fn tag_search<'a, T: ?Sized + Ord>(&'a self, tag: &T) -> StoreSearchAnyIter<'a, T>
where String: Borrow<T>
{
let r= (std::ops::Bound::Included(tag), std::ops::Bound::Included(tag));
StoreSearchAnyIter(self, Some(self.tags.range::<T, _>(r)), VecDeque::new(), PhantomData)
}
fn _assert_test_search(&self)
{
let _x: Vec<_> = self.tag_search("hello").dedup_ref().collect();
let _x: Vec<_> = self.tag_search_any(vec!["hello", "one", "two"]).dedup_ref().collect();
let _x: Vec<_> = self.tag_search_all(vec!["hello", "one", "two"]).dedup_ref().collect();
}
async fn _assert_test_search_par(self: Arc<Self>)
{
let _x: Vec<_> = self.tag_search_all_detached(vec!["hi"]).collect().await;
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAllIter<'a, T>
where T: Ord,
String: Borrow<T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let data = &self.0.data;
let tags = &self.3;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx))
.filter_map(move |x| data.get(x))
// Ensure all our `tags` are present in the entry's tags
.filter(move |entry| tags.iter()
.filter(move |&x| entry.tags
.binary_search_by_key(x, |t: &String| -> &T { t.borrow() })
.is_err())
.count() == 0);
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> Iterator for StoreSearchAnyIter<'a, T>
{
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item>
{
if let Some(range) = &mut self.1 {
if let Some((_, &ti)) = range.next() {
// tag index get
let data_hashes = &self.0.data_hashes;
let iter = self.0.tag_mappings.get(ti)
.map_into_iter()
.filter_map(move |&idx| data_hashes.get(idx));
self.2.extend(iter);
}
} else {
return None;
}
self.2.pop_front().map(|x| self.0.data.get(x)).flatten()
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self.1 {
None => (0, Some(0)),
Some(_) => (0, None),
}
}
}
impl<'a, T: ?Sized> iter::FusedIterator for StoreSearchAnyIter<'a, T>{}

@ -1,61 +0,0 @@
use super::*;
#[inline] fn gen_meta() -> StoreMetadata
{
StoreMetadata{
name: "test".to_string(),
root: "./test/db".to_owned().into(),
}
}
#[test]
fn create()
{
let store = Store::new(gen_meta());
let freeze = store.into_freeze();
let _ = freeze.into_new();
}
#[test]
fn entries()
{
let mut store = Store::new(gen_meta());
macro_rules! build {
() => {
EntryBuilder::new()
.name("Hello")
.desc("Desc")
.tags(&["Tag1", "Tag2", "Tag3"])
.hash(sha256::compute_slice(b"hello world"))
.location(Path::new("test_file"))
};
}
let entry = build!()
.build().expect("Failed to build entry");
println!("Entry: {:#?}", entry);
let entry = store.insert(entry).clone();
store.insert(build!()
.hash(sha256::compute_slice("something else"))
.name("Entry 2")
.build().expect("Failed to build entry 2"));
println!("Entry ref: {:#?}", entry);
println!("Store: {:#?}", store);
let freeze = store.freeze();
println!("Frozen: {:#?}", freeze);
let store2 = Store::unfreeze(&freeze);
assert_eq!(store2.tag_search("Tag2").filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert_eq!(store.tag_search_any(vec!["Tag2", "Tag3"]).filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert_eq!(store2.tag_search_all(vec!["Tag1", "Tag3"]).filter(|x| x.name() == entry.name()).next().unwrap().hash(), entry.hash());
assert!(store.tag_search_all(vec!["Tag1", "Nope"]).next().is_none());
let json = serde_json::to_string_pretty(&freeze).unwrap();
println!("Freeze serialised ({} bytes): '{}'", json.len(), json);
let ufreeze: Freeze = serde_json::from_str(json.as_str()).unwrap();
assert_eq!(ufreeze, freeze);
println!("Freeze unseralised as store: {:#?}", ufreeze.into_new());
}

@ -0,0 +1,65 @@
use super::*;
use std::cmp;
use std::ptr;
use std::ffi::c_void;
/// Explicitly zero out a byte buffer.
///
/// Essentially `explicit_bzero()`.
#[inline(never)]
pub fn explicit_clear(buffer : &mut[u8]) {
use std::ffi::c_void;
unsafe {
std::ptr::write_bytes(buffer.as_mut_ptr() as * mut c_void, 0, buffer.len());
if cfg!(target_arch = "x86_64") || cfg!(target_arch = "x86") {
asm!("clflush [{}]", in(reg)buffer.as_mut_ptr());
} else {
asm!("")
}
}
}
/// Set all bytes of this buffer to a single value.
///
/// Essentially `memset()`.
#[inline] pub fn set(buffer: &mut [u8], to: u8)
{
unsafe {
std::ptr::write_bytes(buffer.as_mut_ptr() as *mut c_void, to, buffer.len());
}
}
/// Zero out this buffer
///
/// Essentially `bzero()`.
#[inline(always)] pub fn clear(buffer: &mut [u8])
{
set(buffer, 0);
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices can overlap.
#[inline] pub fn memmove(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices must *not* overlap.
#[inline] pub fn memcpy(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy_nonoverlapping(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}

@ -0,0 +1,282 @@
//! Stream related things
use super::*;
use std::{
task::{
Poll,
Context,
},
pin::Pin,
marker::PhantomData,
};
use tokio::{
io::{
AsyncBufRead,
AsyncRead,
},
prelude::*,
};
use futures::{
stream::{
Stream,
StreamExt,
Fuse,
},
};
use pin_project::pin_project;
/// Converts a stream of byte-containing objects into an `AsyncRead` and `AsyncBufRead`er.
#[pin_project]
pub struct StreamReader<I, T>
where I: Stream<Item=T>
{
#[pin]
source: Fuse<I>,
buffer: Vec<u8>,
}
impl<T, I> StreamReader<I, T>
where I: Stream<Item=T>,
T: AsRef<[u8]>
{
/// The current buffer
pub fn buffer(&self) -> &[u8]
{
&self.buffer[..]
}
/// Consume into the original stream
pub fn into_inner(self) -> I
{
self.source.into_inner()
}
/// Create a new instance with a buffer capacity
pub fn with_capacity(source: I, cap: usize) -> Self
{
Self {
source: source.fuse(),
buffer: Vec::with_capacity(cap)
}
}
/// Create a new instance from this stream
pub fn new(source: I) -> Self
{
Self {
source: source.fuse(),
buffer: Vec::new(),
}
}
/// Attempt to add to this buffer
#[cold] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<usize>
{
let this = self.project();
match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
}
}
}
impl<T: AsRef<[u8]>, I: Stream<Item=T>> AsyncRead for StreamReader<I,T>
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.project();
if this.buffer.len() != 0 {
// We can fill the whole buffer, do it.
Poll::Ready(Ok(bytes::memcpy(this.buffer.drain(..buf.len()).as_slice(), buf)))
} else {
// Buffer is empty, try to fill it
match match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
} {
Poll::Ready(0) => Poll::Ready(Ok(0)),
Poll::Ready(x) => {
// x has been written
Poll::Ready(Ok(bytes::memcpy(this.buffer.drain(..x).as_slice(), buf)))
},
_ => Poll::Pending,
}
}
}
}
impl<T: AsRef<[u8]>, I: Stream<Item=T>> AsyncBufRead for StreamReader<I,T>
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.project();
if this.buffer.len() < 1 {
// Fetch more into buffer
match match this.source.poll_next(cx) {
Poll::Ready(None) => Poll::Ready(0),
Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => {
let buf = buf.as_ref();
this.buffer.extend_from_slice(buf);
Poll::Ready(buf.len())
},
_ => Poll::Pending,
} {
Poll::Ready(0) => Poll::Ready(Ok(&[])), // should we return EOF error here?
Poll::Ready(x) => Poll::Ready(Ok(&this.buffer[..x])),
_ => Poll::Pending
}
} else {
Poll::Ready(Ok(&this.buffer[..]))
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().buffer.drain(..amt);
}
}
#[cfg(test)]
mod tests
{
use super::*;
use tokio::{
sync::{
mpsc,
},
};
#[tokio::test]
async fn stream_of_vec()
{
let (mut tx, rx) = mpsc::channel(16);
let sender = tokio::spawn(async move {
tx.send("Hello ").await.unwrap();
tx.send("world").await.unwrap();
tx.send("\n").await.unwrap();
tx.send("How ").await.unwrap();
tx.send("are ").await.unwrap();
tx.send("you").await.unwrap();
});
let mut reader = StreamReader::new(rx);
let mut output = String::new();
let mut read;
while {read = reader.read_line(&mut output).await.expect("Failed to read"); read!=0} {
println!("Read: {}", read);
}
println!("Done: {:?}", output);
sender.await.expect("Child panic");
assert_eq!(&output[..], "Hello world\nHow are you");
}
}
/// A stream that chunks its input.
#[pin_project]
pub struct ChunkingStream<S, T, Into=Vec<T>>
{
#[pin] stream: Fuse<S>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> S
{
self.stream.into_inner()
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &S
{
self.stream.get_ref()
}
pub fn get_mut(&mut self)-> &mut S
{
self.stream.get_mut()
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Stream for ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
let this = self.as_mut().project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
// Stream is over
break;
},
Poll::Ready(Some(item)) => {
this.buf.push(item);
},
_ => return Poll::Pending,
}
}
//debug!("Sending buffer of {} (cap {})", self.buf.len(), self.cap);
// Buffer is full or we reach end of stream
Poll::Ready(if self.buf.len() == 0 {
None
} else {
let this = self.project();
*this.push_now = false;
let output = std::mem::replace(this.buf, Vec::with_capacity(*this.cap));
Some(output.into())
})
}
}

@ -0,0 +1,176 @@
//! Defer dropping an object to a background thread.
//!
//! This can help when dropping large structures in a performance-critial context.
use super::*;
use std::{
thread,
time::Duration,
sync::mpsc,
any::Any,
marker::{Send, Sync},
};
pub type Defer = Box<dyn Any + Send + 'static>;
/// A deferred dropping handle.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct DeferredDropper(mpsc::Sender<Defer>);
impl DeferredDropper
{
/// Drop a Boxed item on the background thread
///
/// # Panics
/// If the background thread has panicked.
#[inline] pub fn drop_boxed<T: Any + Send + 'static>(&self, item: Box<T>)
{
self.0.send(item).unwrap();
}
/// Drop an item on the background thread
///
/// # Panics
/// If the background thread has panicked.
#[inline(always)] pub fn drop<T: Any +Send+ 'static>(&self, item: T)
{
self.drop_boxed(Box::new(item))
}
/// Send this deferring drop
///
/// # Panics
/// If the background thread has panicked.
#[inline(always)] pub fn send(&self, item: impl Into<Defer>)
{
self.0.send(item.into()).unwrap()
}
}
/// Subscribe to the deferred dropper. Usually avoid doing this, and just use the `drop!` macro, or use the thread-local static dropper reference `HANDLE` directly.
fn defer_drop_sub() -> DeferredDropper
{
use std::sync::Mutex; // Can we get rid of this somehow? I don't think we can. Either we mutex lock here, or we mutex lock on every (I think every, should look into that..) send. I think this is preferrable.
#[repr(transparent)]
struct Shim(Mutex<mpsc::Sender<Defer>>);
lazy_static! {
static ref TX: Shim = {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for val in rx.into_iter().lag(Duration::from_millis(10))
{
//let _ = thread::spawn(move || drop(val)).join(); // To catch panic?
drop(val); // What if this panics?
}
});
Shim(Mutex::new(tx))
};
}
DeferredDropper(TX.0.lock().unwrap().clone())
}
thread_local! {
pub static HANDLE: DeferredDropper = defer_drop_sub();
}
/// Drop this item on another thread
#[macro_export] macro_rules! defer_drop {
(in $sub:ident; box $val:expr) => {
$sub.drop_boxed($val)
};
(in $sub:ident; $val:expr) => {
$sub.drop($val)
};
(box $val:expr) => {
{
$crate::ext::defer_drop::HANDLE.with(move |sub| {
sub.drop_boxed($val)
});
}
};
($val:expr) => {
{
$crate::ext::defer_drop::HANDLE.with(move |sub| {
sub.drop($val)
});
}
};
}
#[cfg(test)]
mod tests
{
#[test]
fn mac()
{
use crate::*;
let sub = super::defer_drop_sub();
let large_vec = vec![String::from("hello world"); 1000];
defer_drop!(in sub; large_vec.clone());
defer_drop!(large_vec);
defer_drop!(box Box::new("hello world?"));
defer_drop!(in sub; box Box::new("hello world?"));
}
#[test]
fn dropping_larges()
{
for joiner in std::iter::repeat_with(|| {
let large_vec = vec![String::from("hello world"); 1000];
let h = {
let mut large_vec = large_vec.clone();
std::thread::spawn(move || {
large_vec.sort();
defer_drop!(large_vec);
})
};
defer_drop!(large_vec);
h
}).take(1000)
{
joiner.join().unwrap();
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
#[test]
fn dropping_vec()
{
let sub = super::defer_drop_sub();
let large_vec = vec![String::from("hello world"); 1000];
sub.drop(large_vec.clone());
sub.drop_boxed(Box::new(large_vec)) //we can't just send boxed slice because MUH SIZED???
//FUCK THIS! J)QI EJOAIJAOIW
/*
unsafe {
let raw = Box::into_raw(large_vec.into_boxed_slice());
sub.send(Box::from_raw(raw as *mut (dyn std::any::Any + Send + 'static))).unwrap();
}*/
}
#[test]
fn clone_shim()
{
for joiner in std::iter::repeat_with(|| {
std::thread::spawn(move || {
let mut subs = Vec::new();
for _ in 0..100 {
subs.push(super::defer_drop_sub());
}
})
}).take(1000)
{
joiner.join().unwrap();
}
}
}

@ -0,0 +1,116 @@
use std::{
mem,
iter::{
self,
ExactSizeIterator,
FusedIterator,
},
slice,
fmt,
};
#[derive(Debug, Clone)]
pub struct HexStringIter<I>(I, [u8; 2]);
impl<I: Iterator<Item = u8>> HexStringIter<I>
{
/// Write this hex string iterator to a formattable buffer
pub fn consume<F>(self, f: &mut F) -> fmt::Result
where F: std::fmt::Write
{
if self.1[0] != 0 {
write!(f, "{}", self.1[0] as char)?;
}
if self.1[1] != 0 {
write!(f, "{}", self.1[1] as char)?;
}
for x in self.0 {
write!(f, "{:02x}", x)?;
}
Ok(())
}
/// Consume into a string
pub fn into_string(self) -> String
{
let mut output = match self.size_hint() {
(0, None) => String::new(),
(_, Some(x)) |
(x, None) => String::with_capacity(x),
};
self.consume(&mut output).unwrap();
output
}
}
pub trait HexStringIterExt<I>: Sized
{
fn into_hex(self) -> HexStringIter<I>;
}
pub type HexStringSliceIter<'a> = HexStringIter<iter::Copied<slice::Iter<'a, u8>>>;
pub trait HexStringSliceIterExt
{
fn hex(&self) -> HexStringSliceIter<'_>;
}
impl<S> HexStringSliceIterExt for S
where S: AsRef<[u8]>
{
fn hex(&self) -> HexStringSliceIter<'_>
{
self.as_ref().iter().copied().into_hex()
}
}
impl<I: IntoIterator<Item=u8>> HexStringIterExt<I::IntoIter> for I
{
#[inline] fn into_hex(self) -> HexStringIter<I::IntoIter> {
HexStringIter(self.into_iter(), [0u8; 2])
}
}
impl<I: Iterator<Item = u8>> Iterator for HexStringIter<I>
{
type Item = char;
fn next(&mut self) -> Option<Self::Item>
{
match self.1 {
[_, 0] => {
use std::io::Write;
write!(&mut self.1[..], "{:02x}", self.0.next()?).unwrap();
Some(mem::replace(&mut self.1[0], 0) as char)
},
[0, _] => Some(mem::replace(&mut self.1[1], 0) as char),
_ => unreachable!(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (l, h) = self.0.size_hint();
(l * 2, h.map(|x| x*2))
}
}
impl<I: Iterator<Item = u8> + ExactSizeIterator> ExactSizeIterator for HexStringIter<I>{}
impl<I: Iterator<Item = u8> + FusedIterator> FusedIterator for HexStringIter<I>{}
impl<I: Iterator<Item = u8>> From<HexStringIter<I>> for String
{
fn from(from: HexStringIter<I>) -> Self
{
from.into_string()
}
}
impl<I: Iterator<Item = u8> + Clone> fmt::Display for HexStringIter<I>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
self.clone().consume(f)
}
}

@ -1,4 +1,6 @@
use super::*;
use std::iter::Fuse;
use std::marker::PhantomData;
/// An iterator that may be empty.
#[derive(Debug, Clone)]
@ -155,3 +157,108 @@ where S: AsRef<[T]>,
true
}
}
// Chunking iterator
/// A stream that chunks its input.
pub struct ChunkingIter<I, T, Into=Vec<T>>
{
stream: Fuse<I>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingIter<S,T, Into>
where S: Iterator<Item=T>,
Into: From<Vec<T>>
{
/// Create a new chunking iterator with this chunk size.
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> Fuse<S>
{
self.stream
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &Fuse<S>
{
&self.stream
}
pub fn get_mut(&mut self)-> &mut Fuse<S>
{
&mut self.stream
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Iterator for ChunkingIter<S,T, Into>
where S: Iterator<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn next(&mut self) -> Option<Self::Item> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
match self.stream.next() {
None => {
// Stream is over
break;
},
Some(item) => {
self.buf.push(item);
},
}
}
// Buffer is full or we reach end of iter
if self.buf.len() == 0 {
None
} else {
self.push_now = false;
let output = std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap));
Some(output.into())
}
}
}

@ -0,0 +1,74 @@
//! Lagging iterators / (todo) streams.
use super::*;
use std::time::Duration;
/// A lagged iterator
#[derive(Debug, Clone)]
pub struct Lag<I: ?Sized, T>(Duration, I)
where I: Iterator<Item=T>;
impl<I: ?Sized, T> Lag<I, T>
where I: Iterator<Item=T>
{
/// Set the lag duration
#[inline] pub fn set_duration(&mut self, dur: Duration)
{
self.0 = dur;
}
/// Get the lag duration
#[inline] pub fn duration(&self) -> Duration
{
self.0
}
}
impl<I, T> Lag<I, T>
where I: Iterator<Item=T>
{
/// Consume into the inner iterator
#[inline] pub fn into_inner(self) -> I
{
self.1
}
}
pub trait LagIterExt: Iterator
{
fn lag(self, dur: Duration) -> Lag<Self, Self::Item>;
}
impl<I> LagIterExt for I
where I: Iterator
{
#[inline] fn lag(self, dur: Duration) -> Lag<Self, Self::Item>
{
Lag(dur, self)
}
}
impl<I: ?Sized, T> Iterator for Lag<I, T>
where I: Iterator<Item=T>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
std::thread::sleep(self.0);
self.1.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.1.size_hint()
}
}
impl<I: ?Sized, T> FusedIterator for Lag<I, T>
where I: Iterator<Item=T> + FusedIterator{}
impl<I: ?Sized, T> ExactSizeIterator for Lag<I, T>
where I: Iterator<Item=T> + ExactSizeIterator{}
impl<I: ?Sized, T> DoubleEndedIterator for Lag<I, T>
where I: Iterator<Item=T> + DoubleEndedIterator
{
fn next_back(&mut self) -> Option<Self::Item>
{
std::thread::sleep(self.0);
self.1.next()
}
}

@ -12,6 +12,20 @@ pub use streams::*;
mod hashers;
pub use hashers::*;
mod hex;
pub use hex::*;
/// Functions for manipulating byte slices
pub mod bytes;
mod lag;
pub use lag::*;
mod defer_drop;
pub use defer_drop::*;
pub mod chunking;
pub const PRECOLLECT_STACK_SIZE: usize = 64;
/// Collect an iterator's output and then drop it to detach the iterator from any references or resources it might have.
@ -42,35 +56,262 @@ pub const PRECOLLECT_STACK_SIZE: usize = 64;
};
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)]
pub enum Either<T,U>
{
T(T),
U(U),
/// The ID type used for backing ID types;
pub type GenericID = uuid::Uuid;
/// Create a type that contains a (globally) unique ID.
#[macro_export] macro_rules! id_type {
($name:ident $(: $doc:literal)?) => ($crate::id_type!{pub(self) $name $(: $doc)?});
($vis:vis $name:ident $(: $doc:literal)?) => {
$(#[doc=$doc])?
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
$vis struct $name($crate::ext::GenericID);
impl $name
{
/// Create a new unique ID.
#[inline(always)] fn id_new() -> Self
{
Self($crate::ext::GenericID::new_v4())
}
/// The generic ID type backing this one
#[inline(always)] fn id_generic(&self) -> &$crate::ext::GenericID
{
&self.0
}
/// Consume into the generic ID
#[inline(always)] fn id_into_generic(self) -> $crate::ext::GenericID
{
self.0
}
/// Create from a generic ID
#[inline(always)] fn id_from_generic(gen: $crate::ext::GenericID) -> Self
{
Self(gen)
}
}
impl ::std::fmt::Display for $name
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result
{
//use ::std::fmt::Write;
f.write_str(concat!(stringify!($name),"<"))?;
self.0.fmt(f)?;
f.write_str(">")
}
}
}
}
impl<T,U> Either<T,U>
{
/// Convert this sum type into
pub fn cast<V>(self) -> V
where V: From<T> + From<U>// eh, this doesn't work. forget about it.
{
match self {
Self::T(t) => t.into(),
Self::U(u) => u.into(),
/// Expands to `unreachable_unchecked` in non-debug builds.
///
/// # Safety
/// You must make 100% sure this code path will never be entered, or it will cause undefined behaviour in release builds.
#[macro_export] macro_rules! debug_unreachable {
() => {
if cfg!(debug_assertions) {
#[cold] unreachable!()
} else {
::std::hint::unreachable_unchecked()
}
};
}
/// Dirty debugging macro to get the compiler to print an error message telling you the size of a type.
/// ```
/// check_size!((u8, u8)); // Expected ... found one with *2* elements
/// ```
/// # Size assertions
/// Can also be used to statically assert the size of a type
/// ```
/// # use datse::ext::check_size;
/// check_size!(u16 where == 2; "u16 should be 2 bytes");
/// check_size!(u16 where < 3; "u16 should be lower than 3 bytes");
/// check_size!(u16 where > 1; "u16 should be larger that 1 byte");
/// check_size!(u16 where != 0; "u16 should be larger that 0 bytes");
/// ```
///
/// You can also combine multiple
/// ```
/// # use datse::ext::check_size;
/// check_size!([u8; 512] where {
/// != 10;
/// > 511;
/// < 513;
/// == 512
/// });
/// ```
///
/// This can be used to give you prompts as to when you might want to consider boxing a type.
#[macro_export] macro_rules! check_size {
($t:ty) => {
const _: [(); 0] = [(); ::std::mem::size_of::<$t>()];
};
($t:ty where == $n:literal $(; $msg:literal)?) => {
const _: [(); $n] = [(); ::std::mem::size_of::<$t>()];
};
($t:ty where {$($op:tt $n:literal);+} $(; $msg:literal)?) => {
$(
$crate::static_assert!(::std::mem::size_of::<$t>() $op $n);
)+
};
($t:ty where $op:tt $n:literal $(; $msg:literal)?) => {
$crate::static_assert!(::std::mem::size_of::<$t>() $op $n $(; $msg)?);
};
}
check_size!(u8 where == 1);
check_size!(u16 where > 1);
check_size!([u8; 512] where <= 512);
check_size!([u8; 512] where {
!= 10;
> 511;
< 513;
== 512
});
/// Assert the output of a constant boolean expression is `true` at compile time.
#[macro_export] macro_rules! static_assert {
($val:expr $(; $msg:literal)?) => {
const _: [(); 1] = [(); ($val as bool) as usize];
}
}
/// Create an ad-hoc sum type.
macro_rules! either {
(@ type $first:ty) => {
$first
/// Assert a trait is object safe. This will produce a compiler error if the trait is not object safe
#[macro_export] macro_rules! assert_object_safe {
($trait:path $(; $msg:literal)?) => {
const _:() = {
#[cold] fn __assert_object_safe() -> !
{
let _: &dyn $trait;
unsafe {
debug_unreachable!()
}
}
};
}
}
assert_object_safe!(AsRef<str>; "object safety assertion test");
static_assert!(1+1==2; "static assertion test");
/// Execute this expression if running in debug mode.
#[macro_export] macro_rules! debug_expr {
(if $expr:expr; else $or:expr) => {
if cfg!(debug_assertions) { $expr }
else { $or }
};
(@ type $first:ty, $($rest:ty),*) => {
Either<$first, either!(@ type $($rest),*)>
(else $expr:expr) => {
if !cfg!(debug_assertions) { $expr }
};
(type $($type:ty)|*) => {
either!(@ type $($type),*)
($expr:expr) => {
if cfg!(debug_assertions) { $expr }
};
}
#[macro_export] macro_rules! prog1 {
($first:expr, $($rest:expr);+ $(;)?) => {
($first, $( $rest ),+).0
}
}
pub trait UnwrapInfallible<T>
{
fn unwrap_infallible(self) -> T;
}
impl<T> UnwrapInfallible<T> for Result<T, std::convert::Infallible>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Err`.
#[inline(always)] fn unwrap_infallible(self) -> T {
match self {
Ok(v) => v,
Err(_) => unsafe { debug_unreachable!() },
}
}
}
impl<T> UnwrapInfallible<T> for Result<T, !>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Err`.
#[inline(always)] fn unwrap_infallible(self) -> T {
match self {
Ok(v) => v,
Err(_) => unsafe { debug_unreachable!() },
}
}
}
pub trait UnwrapErrInfallible<T>
{
fn unwrap_err_infallible(self) -> T;
}
impl<T> UnwrapErrInfallible<T> for Result<std::convert::Infallible, T>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Ok`.
#[inline(always)] fn unwrap_err_infallible(self) -> T {
match self {
Err(v) => v,
Ok(_) => unsafe { debug_unreachable!() },
}
}
}
impl<T> UnwrapErrInfallible<T> for Result<!, T>
{
/// Unwrap with 0 overhead for values that cannot possibly be `Ok`.
#[inline(always)] fn unwrap_err_infallible(self) -> T {
match self {
Err(v) => v,
Ok(_) => unsafe { debug_unreachable!() },
}
}
}
pub trait ChunkStreamExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>;
fn chunk(self, sz: usize) -> chunking::ChunkingStream<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkStreamExt<T> for S
where S: Stream<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>
{
chunking::ChunkingStream::new(self, sz)
}
}
pub trait ChunkIterExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> iters::ChunkingIter<Self,T,I>;
fn chunk(self, sz: usize) -> iters::ChunkingIter<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkIterExt<T> for S
where S: Iterator<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> iters::ChunkingIter<Self,T,I>
{
iters::ChunkingIter::new(self, sz)
}
}

@ -1,9 +1,12 @@
#![feature(never_type)]
#![feature(asm)]
#![allow(dead_code)]
#![cfg_attr(debug_assertions, allow(unused_imports))]
#[macro_use] extern crate ad_hoc_iter;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate lazy_static;
use serde::{Serialize, Deserialize};
@ -19,12 +22,22 @@ macro_rules! io_err {
($msg:expr) => (io_err!(Other, $msg));
}
// Utils
mod ext;
use ext::*;
mod slice;
pub mod data;
// Real stuff
mod service;
// Serving
// mod ep;
fn main() {
// debug_expr!(println!("debug"));
// debug_expr!(else println!("not debug"));
// debug_expr!(if println!("debug 2"); else println!("not debug 2"));
println!("Hello, world!");
}

@ -0,0 +1,100 @@
//! Commands for a service
use super::*;
use std::{any::Any, marker::{Send, Sync}};
use std::fmt;
use tokio::sync::{
oneshot,
};
id_type!(pub CommandID: "ID of a sent command");
/// A response from the service for a sent command
pub type Response = Box<dyn Any + Send + 'static>;
/// What kind of command to send to the service?
#[derive(Debug, PartialEq, Eq)]
pub enum CommandKind
{
}
/// A command to send to a running service.
///
/// Created when sending a `CommandKind` to a service. This is your handle for receiving the response.
#[derive(Debug)]
pub struct Command
{
id: CommandID,
//kind: CommandKind, // `CommandKind` -(sent to)> <running service> -(handle returned from send func)> `Command`
resp: oneshot::Receiver<Option<Response>>,
}
impl fmt::Display for Command
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "CommandID<{}>", self.id.0.as_bytes().hex())
}
}
impl Command
{
/// The unique ID of this command
#[inline] pub fn id(&self) -> &CommandID
{
&self.id
}
/// Prevent any response from being sent to this `Command` handle from the service.
///
/// Indicates to the service we don't care about a response. This has the same effect as calling `ignore` but can be used in a builder-pattern way from a returned `Command` from a send.
/// Example: `let comm = service.send_command(comm_kind).ignored();`
#[inline(always)] pub fn ignored(mut self) -> Self
{
self.ignore();
self
}
/// Close the response channel, indicating to the service that we don't care about the response.
#[inline] pub fn ignore(&mut self)
{
self.resp.close();
}
/// Dispose of this `Command`, returning any `Response` that might've already been sent.
#[inline] pub fn close(mut self) -> Option<Response>
{
use oneshot::error::TryRecvError;
self.resp.close();
self.resp.try_recv().ok().flatten()
}
/// Poll for a response. If none has been sent yet, then return `self` so it can be polled again.
#[inline] pub fn try_into_response(mut self) -> Result<Option<Response>, Self>
{
use oneshot::error::TryRecvError;
match self.resp.try_recv() {
Ok(v) => Ok(v),
Err(TryRecvError::Closed) => Ok(None),
Err(TryRecvError::Empty) => Err(self),
}
}
/// Consume this instance into the response from the service.
#[inline] pub async fn into_repsonse(self) -> Option<Response>
{
self.resp.await.ok().flatten()
}
}
impl PartialEq for Command
{
#[inline] fn eq(&self, other: &Self) -> bool
{
self.id == other.id
}
}

@ -0,0 +1,40 @@
//! The actual running service
use super::*;
use std::sync::Arc;
use tokio::sync::{
RwLock,
Mutex,
mpsc,
};
pub mod command;
/// Handle to a running service
#[derive(Debug)]
pub struct Handle
{
}
#[derive(Debug)]
struct CommInner
{
}
/// Communicate with a running service
#[derive(Debug, Clone)]
pub struct Comm{
inner: Arc<CommInner>,
tx: mpsc::Sender<()>,
}
impl Eq for Comm{}
impl PartialEq for Comm {
#[inline] fn eq(&self, other: &Self) -> bool
{
Arc::ptr_eq(&self.inner, &other.inner)
}
}

@ -1,30 +1,6 @@
use super::*;
use std::cmp;
use std::ptr;
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices can overlap.
#[inline] pub fn move_bytes(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
/// Copy bytes from one slice to another.
///
/// # Notes
/// The slices must *not* overlap.
#[inline] pub fn copy_bytes(from: &[u8], to: &mut [u8]) -> usize
{
let len = cmp::min(from.len(), to.len());
unsafe {
ptr::copy_nonoverlapping(from.as_ptr(), to.as_mut_ptr(), len);
}
len
}
pub use crate::ext::bytes::{
memcpy as copy_bytes,
memmove as move_bytes,
};

Loading…
Cancel
Save