Compare commits

...

28 Commits

Author SHA1 Message Date
Avril 3267151615
Removed un-checked #[cold] from match arm branches.
2 years ago
Avril bfaff6067a
added connection acceptance filters
4 years ago
Avril 5c784482f6
ref lt
4 years ago
Avril bdf61340db
Packaging version 0.9.0
4 years ago
Avril 7635ff9d52
update Makefile
4 years ago
Avril acf2ac605e
Merge branch 'feed' into master
4 years ago
Avril d1251a592c
Merge branch 'master' of git.flanchan.moe:flanchan/genmarkov into master
4 years ago
Avril 75730cbe0f
working implementation of handler
4 years ago
Avril 5dc10547d5
handle okay
4 years ago
Avril 6453392758
remove direct dependancy on libc
4 years ago
Avril a6fc26a053
update ebuild
4 years ago
Avril 3b4dc663fa
Packaging version 0.8.1
4 years ago
Avril d8404f65ed
Packaging version 0.8.1
4 years ago
Avril 5280c622c9
todo
4 years ago
Avril 1c509031d6
range is configurable
4 years ago
Avril 107b34bcbd
document feed() pipeline
4 years ago
Avril 684a6f6aa0
repurpose split-sentance to make sense
4 years ago
Avril 8996b0bb7b
repurpose split-sentance to make sense; add feed-sentance for old functionality
4 years ago
Avril 633b4351c2
initialiser docs
4 years ago
Avril bcaac2b2e3
remove sender clone
4 years ago
Avril 59dcecded3
added instant-init; signal handler also waits for server
4 years ago
Avril 4e1e38a0fd
fix init bug
4 years ago
Avril 5ba673e64f
safe main-thread panics; save cannot happen until server initialised
4 years ago
Avril cb163a14e9
Packaging version 0.7.1
4 years ago
Avril 34a62da8ba
empty build.rs
4 years ago
Avril 460d2b0081
gentoo ebuild
4 years ago
Avril 5f2e3a5b5b
ld
4 years ago
Avril 7c67a4decc
remove unused dep
4 years ago

815
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,6 +1,6 @@
[package]
name = "markov"
version = "0.7.1"
version = "0.9.1"
description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -12,14 +12,20 @@ license = "gpl-3.0-or-later"
default = ["compress-chain", "split-newlines", "api"]
# Compress the chain data file when saved to disk
compress-chain = ["async-compression"]
compress-chain = ["async-compression", "bzip2-sys"]
# Treat each new line as a new set to feed instead of feeding the whole data at once
split-newlines = []
# Feed each sentance seperately with default /get api, instead of just each line / whole body
# Maybe better without `split-newlines`?
# Kinda experimental
#
# Note that this happens after `split-newlines`.
feed-sentance = ["split-sentance"]
# Split input buffer's to feed by sentance as well as word boundaries.
#
# Note that this happens after `split-newlines`.
# This feature does nothing if `feed-sentance` is enabled.
split-sentance = []
# Always aggregate incoming buffer instead of streaming them
@ -30,23 +36,20 @@ split-sentance = []
# NOTE: This does nothing if `split-newlines` is not enabled
always-aggregate = []
# Feeds will hog the buffer lock until the whole body has been fed, instead of acquiring lock every time
# This will make feeds of many lines faster but can potentially cause DoS
#
# With: ~169ms
# Without: ~195ms
#
# NOTE:
# This does nothing if `always-aggregate` is enabled and/or `split-newlines` is not enabled
# Does nothing on versions 9.0+
hog-buffer = []
# Enable the /api/ route
api = []
# Do not wait 2 seconds before starting worker tasks after server
instant-init = []
[profile.release]
opt-level = 3
lto = "fat"
codegen-units = 1
strip=true
[dependencies]
chain = {package = "markov", version = "1.1.0"}
@ -62,7 +65,11 @@ serde = {version ="1.0", features=["derive"]}
toml = "0.5.6"
async-compression = {version = "0.3.5", features=["tokio-02", "bzip2"], optional=true}
pin-project = "0.4"
libc = "0.2.79"
smallmap = "1.1.5"
lazy_static = "1.4.0"
once_cell = "1.4.1"
bzip2-sys = {version = "0.1.9", optional = true}
cidr = {version = "0.1.1", features = ["serde"]}
[build-dependencies]
rustc_version = "0.2"

@ -1,4 +1,5 @@
FEATURES:="api,always-aggregate"
FEATURES:="api,split-sentance"
VERSION:=`cargo read-manifest | rematch - 'version":"([0-9\.]+)"' 1`
markov:
cargo build --release --features $(FEATURES)
@ -22,5 +23,7 @@ uninstall:
rm -f /usr/local/bin/markov
package:
git add .
-git commit -S -m "Packaging version $(VERSION)"
cargo package
mv ./target/package/markov-$(VERSION).crate{,.gz}

@ -0,0 +1 @@
Disallow exact same map input buffers by keeping hashes of input buffers.

@ -0,0 +1,26 @@
extern crate rustc_version;
use rustc_version::{version, version_meta, Channel};
fn main() {
// Assert we haven't travelled back in time
assert!(version().unwrap().major >= 1);
// Set cfg flags depending on release channel
match version_meta().unwrap().channel {
Channel::Stable => {
println!("cargo:rustc-cfg=stable");
}
Channel::Beta => {
println!("cargo:rustc-cfg=beta");
}
Channel::Nightly => {
println!("cargo:rustc-cfg=nightly");
}
Channel::Dev => {
println!("cargo:rustc-cfg=dev");
}
}
//println!("cargo:rustc-link-lib=static=bz2"); // TODO: Make this conditional for `compress-chain`
}

@ -0,0 +1,217 @@
# Copyright 2017-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
# Auto-Generated by cargo-ebuild 0.3.1
EAPI=7
CRATES="
aho-corasick-0.7.13
arc-swap-0.4.7
async-compression-0.3.5
atty-0.2.14
autocfg-0.1.7
autocfg-1.0.1
base64-0.12.3
bitflags-1.2.1
block-buffer-0.7.3
block-buffer-0.9.0
block-padding-0.1.5
buf_redux-0.8.4
byte-tools-0.3.1
byteorder-1.3.4
bytes-0.5.6
bzip2-0.3.3
bzip2-sys-0.1.9+1.0.8
cc-1.0.60
cfg-if-0.1.10
cfg-if-1.0.0
cloudabi-0.0.3
cpuid-bool-0.1.2
digest-0.8.1
digest-0.9.0
dtoa-0.4.6
either-1.6.1
env_logger-0.7.1
fake-simd-0.1.2
fixedbitset-0.2.0
fnv-1.0.7
fuchsia-cprng-0.1.1
fuchsia-zircon-0.3.3
fuchsia-zircon-sys-0.3.3
futures-0.3.6
futures-channel-0.3.6
futures-core-0.3.6
futures-executor-0.3.6
futures-io-0.3.6
futures-macro-0.3.6
futures-sink-0.3.6
futures-task-0.3.6
futures-util-0.3.6
generic-array-0.12.3
generic-array-0.14.4
getopts-0.2.21
getrandom-0.1.15
h2-0.2.6
half-1.6.0
hashbrown-0.9.1
headers-0.3.2
headers-core-0.2.0
hermit-abi-0.1.17
http-0.2.1
http-body-0.3.1
httparse-1.3.4
httpdate-0.3.2
humantime-1.3.0
hyper-0.13.8
idna-0.2.0
indexmap-1.6.0
input_buffer-0.3.1
iovec-0.1.4
itertools-0.9.0
itoa-0.4.6
kernel32-sys-0.2.2
lazy_static-1.4.0
libc-0.2.79
linked-hash-map-0.5.3
log-0.4.11
markov-1.1.0
matches-0.1.8
memchr-2.3.3
mime-0.3.16
mime_guess-2.0.3
mio-0.6.22
mio-named-pipes-0.1.7
mio-uds-0.6.8
miow-0.2.1
miow-0.3.5
multipart-0.17.0
net2-0.2.35
num_cpus-1.13.0
once_cell-1.4.1
opaque-debug-0.2.3
opaque-debug-0.3.0
percent-encoding-2.1.0
petgraph-0.5.1
pin-project-0.4.26
pin-project-internal-0.4.26
pin-project-lite-0.1.10
pin-utils-0.1.0
pkg-config-0.3.18
ppv-lite86-0.2.9
pretty_env_logger-0.4.0
proc-macro-hack-0.5.18
proc-macro-nested-0.1.6
proc-macro2-1.0.24
quick-error-1.2.3
quote-1.0.7
rand-0.6.5
rand-0.7.3
rand_chacha-0.1.1
rand_chacha-0.2.2
rand_core-0.3.1
rand_core-0.4.2
rand_core-0.5.1
rand_hc-0.1.0
rand_hc-0.2.0
rand_isaac-0.1.1
rand_jitter-0.1.4
rand_os-0.1.3
rand_pcg-0.1.2
rand_xorshift-0.1.1
rdrand-0.4.0
redox_syscall-0.1.57
regex-1.3.9
regex-syntax-0.6.18
remove_dir_all-0.5.3
rustc_version-0.2.3
ryu-1.0.5
safemem-0.3.3
scoped-tls-1.0.0
semver-0.9.0
semver-parser-0.7.0
serde-1.0.116
serde_cbor-0.11.1
serde_derive-1.0.116
serde_json-1.0.58
serde_urlencoded-0.6.1
serde_yaml-0.8.13
sha-1-0.8.2
sha-1-0.9.1
signal-hook-registry-1.2.1
slab-0.4.2
smallmap-1.1.5
socket2-0.3.15
syn-1.0.42
tempfile-3.1.0
termcolor-1.1.0
thread_local-1.0.1
time-0.1.44
tinyvec-0.3.4
tokio-0.2.22
tokio-macros-0.2.5
tokio-tungstenite-0.11.0
tokio-util-0.3.1
toml-0.5.6
tower-service-0.3.0
tracing-0.1.21
tracing-core-0.1.17
tracing-futures-0.2.4
try-lock-0.2.3
tungstenite-0.11.1
twoway-0.1.8
typenum-1.12.0
unicase-2.6.0
unicode-bidi-0.3.4
unicode-normalization-0.1.13
unicode-width-0.1.8
unicode-xid-0.2.1
url-2.1.1
urlencoding-1.1.1
utf-8-0.7.5
version_check-0.9.2
want-0.3.0
warp-0.2.5
wasi-0.10.0+wasi-snapshot-preview1
wasi-0.9.0+wasi-snapshot-preview1
winapi-0.2.8
winapi-0.3.9
winapi-build-0.1.1
winapi-i686-pc-windows-gnu-0.4.0
winapi-util-0.1.5
winapi-x86_64-pc-windows-gnu-0.4.0
ws2_32-sys-0.2.1
yaml-rust-0.4.4
"
inherit cargo
DESCRIPTION="Generate string of text from Markov chain fed by stdin"
# Double check the homepage as the cargo_metadata crate
# does not provide this value so instead repository is used
HOMEPAGE="https://flanchan.moe/markov/"
SRC_URI="$(cargo_crate_uris ${CRATES}) https://git.flanchan.moe/attachments/cf0b9095-2403-465b-b3aa-61b121134c84 -> markov-0.7.1.crate"
RESTRICT="mirror"
# License set may be more restrictive as OR is not respected
# use cargo-license for a more accurate license picture
LICENSE="GPL-3+"
SLOT="0"
KEYWORDS="~amd64"
IUSE="+compress-chain +split-newlines +api split-sentance always-aggregate hog-buffer"
DEPEND="compress-chain? ( app-arch/bzip2 )"
RDEPEND=""
src_configure() {
local myfeatures=(
$(usev compress-chain)
$(usev split-newlines)
$(usev api)
$(usev split-sentance)
$(usev always-aggregate)
$(usev hog-buffer)
)
#TODO: This hack slows compilation down I think, but without it ld fails so... We should add cargo buildscript to do this instead
use compress-chain && export RUSTFLAGS="${RUSTFLAGS} -ldylib=bz2"
cargo_src_configure --no-default-features
}

@ -0,0 +1,217 @@
# Copyright 2017-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
# Auto-Generated by cargo-ebuild 0.3.1
EAPI=7
CRATES="
aho-corasick-0.7.13
arc-swap-0.4.7
async-compression-0.3.5
atty-0.2.14
autocfg-0.1.7
autocfg-1.0.1
base64-0.12.3
bitflags-1.2.1
block-buffer-0.7.3
block-buffer-0.9.0
block-padding-0.1.5
buf_redux-0.8.4
byte-tools-0.3.1
byteorder-1.3.4
bytes-0.5.6
bzip2-0.3.3
bzip2-sys-0.1.9+1.0.8
cc-1.0.60
cfg-if-0.1.10
cfg-if-1.0.0
cloudabi-0.0.3
cpuid-bool-0.1.2
digest-0.8.1
digest-0.9.0
dtoa-0.4.6
either-1.6.1
env_logger-0.7.1
fake-simd-0.1.2
fixedbitset-0.2.0
fnv-1.0.7
fuchsia-cprng-0.1.1
fuchsia-zircon-0.3.3
fuchsia-zircon-sys-0.3.3
futures-0.3.6
futures-channel-0.3.6
futures-core-0.3.6
futures-executor-0.3.6
futures-io-0.3.6
futures-macro-0.3.6
futures-sink-0.3.6
futures-task-0.3.6
futures-util-0.3.6
generic-array-0.12.3
generic-array-0.14.4
getopts-0.2.21
getrandom-0.1.15
h2-0.2.6
half-1.6.0
hashbrown-0.9.1
headers-0.3.2
headers-core-0.2.0
hermit-abi-0.1.17
http-0.2.1
http-body-0.3.1
httparse-1.3.4
httpdate-0.3.2
humantime-1.3.0
hyper-0.13.8
idna-0.2.0
indexmap-1.6.0
input_buffer-0.3.1
iovec-0.1.4
itertools-0.9.0
itoa-0.4.6
kernel32-sys-0.2.2
lazy_static-1.4.0
libc-0.2.79
linked-hash-map-0.5.3
log-0.4.11
markov-1.1.0
matches-0.1.8
memchr-2.3.3
mime-0.3.16
mime_guess-2.0.3
mio-0.6.22
mio-named-pipes-0.1.7
mio-uds-0.6.8
miow-0.2.1
miow-0.3.5
multipart-0.17.0
net2-0.2.35
num_cpus-1.13.0
once_cell-1.4.1
opaque-debug-0.2.3
opaque-debug-0.3.0
percent-encoding-2.1.0
petgraph-0.5.1
pin-project-0.4.26
pin-project-internal-0.4.26
pin-project-lite-0.1.10
pin-utils-0.1.0
pkg-config-0.3.18
ppv-lite86-0.2.9
pretty_env_logger-0.4.0
proc-macro-hack-0.5.18
proc-macro-nested-0.1.6
proc-macro2-1.0.24
quick-error-1.2.3
quote-1.0.7
rand-0.6.5
rand-0.7.3
rand_chacha-0.1.1
rand_chacha-0.2.2
rand_core-0.3.1
rand_core-0.4.2
rand_core-0.5.1
rand_hc-0.1.0
rand_hc-0.2.0
rand_isaac-0.1.1
rand_jitter-0.1.4
rand_os-0.1.3
rand_pcg-0.1.2
rand_xorshift-0.1.1
rdrand-0.4.0
redox_syscall-0.1.57
regex-1.3.9
regex-syntax-0.6.18
remove_dir_all-0.5.3
rustc_version-0.2.3
ryu-1.0.5
safemem-0.3.3
scoped-tls-1.0.0
semver-0.9.0
semver-parser-0.7.0
serde-1.0.116
serde_cbor-0.11.1
serde_derive-1.0.116
serde_json-1.0.58
serde_urlencoded-0.6.1
serde_yaml-0.8.13
sha-1-0.8.2
sha-1-0.9.1
signal-hook-registry-1.2.1
slab-0.4.2
smallmap-1.1.5
socket2-0.3.15
syn-1.0.42
tempfile-3.1.0
termcolor-1.1.0
thread_local-1.0.1
time-0.1.44
tinyvec-0.3.4
tokio-0.2.22
tokio-macros-0.2.5
tokio-tungstenite-0.11.0
tokio-util-0.3.1
toml-0.5.6
tower-service-0.3.0
tracing-0.1.21
tracing-core-0.1.17
tracing-futures-0.2.4
try-lock-0.2.3
tungstenite-0.11.1
twoway-0.1.8
typenum-1.12.0
unicase-2.6.0
unicode-bidi-0.3.4
unicode-normalization-0.1.13
unicode-width-0.1.8
unicode-xid-0.2.1
url-2.1.1
urlencoding-1.1.1
utf-8-0.7.5
version_check-0.9.2
want-0.3.0
warp-0.2.5
wasi-0.10.0+wasi-snapshot-preview1
wasi-0.9.0+wasi-snapshot-preview1
winapi-0.2.8
winapi-0.3.9
winapi-build-0.1.1
winapi-i686-pc-windows-gnu-0.4.0
winapi-util-0.1.5
winapi-x86_64-pc-windows-gnu-0.4.0
ws2_32-sys-0.2.1
yaml-rust-0.4.4
"
inherit cargo
DESCRIPTION="Generate string of text from Markov chain fed by stdin"
# Double check the homepage as the cargo_metadata crate
# does not provide this value so instead repository is used
HOMEPAGE="https://flanchan.moe/markov/"
SRC_URI="$(cargo_crate_uris ${CRATES}) https://git.flanchan.moe/attachments/c6f37bfc-afd8-462f-807f-ab9f95197680 -> markov-0.8.1.crate"
RESTRICT="mirror"
# License set may be more restrictive as OR is not respected
# use cargo-license for a more accurate license picture
LICENSE="GPL-3+"
SLOT="0"
KEYWORDS="~amd64"
IUSE="+compress-chain +split-newlines +api split-sentance feed-sentance always-aggregate hog-buffer"
DEPEND="compress-chain? ( app-arch/bzip2 )"
RDEPEND=""
src_configure() {
local myfeatures=(
$(usev compress-chain)
$(usev split-newlines)
$(usev api)
$(usev split-sentance)
$(usev feed-sentance)
$(usev always-aggregate)
$(usev hog-buffer)
)
#TODO: This hack slows compilation down I think, but without it ld fails so... We should add cargo buildscript to do this instead
use compress-chain && export RUSTFLAGS="${RUSTFLAGS} -ldylib=bz2"
cargo_src_configure --no-default-features
}

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE pkgmetadata SYSTEM "http://www.gentoo.org/dtd/metadata.dtd">
<pkgmetadata>
<maintainer type="person">
<email>flanchan@cumallover.me</email>
</maintainer>
<use>
<flag name="compress-chain">Compress chain when saving/loading</flag>
<flag name="split-newlines">Treat each new line as a new set to feed</flag>
<flag name="api">Enable /api route</flag>
<flag name="feed-sentance">Further split buffers by sentance, feeding a new one for each.</flag>
<flag name="split-sentance">Split by sentance as well as word boundaries</flag>
<flag name="always-aggregate">Always operate on aggregated request body (can speed up writes at the cost of memory)</flag>
<flag name="hog-buffer">Acquire chain mutex write lock while streaming body (can speed up writes, but can also allow for DoS)</flag></use>
</pkgmetadata>

@ -189,12 +189,12 @@ inherit cargo
DESCRIPTION="Generate string of text from Markov chain fed by stdin"
# Double check the homepage as the cargo_metadata crate
# does not provide this value so instead repository is used
HOMEPAGE="https://flanchan.moe/markov/"
SRC_URI="$(cargo_crate_uris ${CRATES}) https://git.flanchan.moe/attachments/c868a695-4aa2-4a9b-a538-a20833263fad -> markov-0.7.0.crate"
HOMEPAGE="homepage field in Cargo.toml inaccessible to cargo metadata"
SRC_URI="$(cargo_crate_uris ${CRATES})"
RESTRICT="mirror"
# License set may be more restrictive as OR is not respected
# use cargo-license for a more accurate license picture
LICENSE="GPL-3+"
LICENSE="Apache-2.0 Apache-2.0 WITH LLVM-exception BSD-2-Clause BSD-3-Clause BSL-1.0 CC0-1.0 ISC MIT Unlicense Zlib gpl-3.0-or-later"
SLOT="0"
KEYWORDS="~amd64"
IUSE=""

@ -4,7 +4,16 @@ max_content_length = 4194304
max_gen_size = 256
save_interval_secs = 2
trust_x_forwarded_for = false
feed_bounds = '2..'
[filter]
inbound = ''
outbound = ''
[writer]
backlog = 32
internal_backlog = 8
capacity = 4
[mask]
default = 'Accept'

@ -165,7 +165,7 @@ impl BindError<std::convert::Infallible>
match self {
Self::Warp(w) => BindError::Warp(w),
Self::IO(w) => BindError::IO(w),
#[cold] _ => unreachable!(),
/*#[cold]*/ _ => unreachable!(),
}
}
}

@ -1,6 +1,5 @@
use libc::{
c_void,
};
use std::ptr;
/// Copy slice of bytes only
///
/// # Notes
@ -9,7 +8,8 @@ pub fn copy_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memcpy(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
//libc::memcpy(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
ptr::copy_nonoverlapping(&src[0] as *const u8, &mut dst[0] as *mut u8, sz);
}
sz
}
@ -22,7 +22,8 @@ pub fn move_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memmove(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
//libc::memmove(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
ptr::copy(&src[0] as *const u8, &mut dst[0] as *mut u8, sz);
}
sz
}

@ -6,6 +6,7 @@ use std::{
Context,
},
pin::Pin,
marker::PhantomData,
};
use tokio::{
io::{
@ -173,3 +174,109 @@ mod tests
assert_eq!(&output[..], "Hello world\nHow are you");
}
}
/// A stream that chunks its input.
#[pin_project]
pub struct ChunkingStream<S, T, Into=Vec<T>>
{
#[pin] stream: Fuse<S>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> S
{
self.stream.into_inner()
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &S
{
self.stream.get_ref()
}
pub fn get_mut(&mut self)-> &mut S
{
self.stream.get_mut()
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Stream for ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
let this = self.as_mut().project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
// Stream is over
break;
},
Poll::Ready(Some(item)) => {
this.buf.push(item);
},
_ => return Poll::Pending,
}
}
debug!("Sending buffer of {} (cap {})", self.buf.len(), self.cap);
// Buffer is full or we reach end of stream
Poll::Ready(if self.buf.len() == 0 {
None
} else {
let this = self.project();
*this.push_now = false;
let output = std::mem::replace(this.buf, Vec::with_capacity(*this.cap));
Some(output.into())
})
}
}

@ -6,6 +6,8 @@ use std::{
io,
borrow::Cow,
num::NonZeroU64,
error,
fmt,
};
use tokio::{
fs::OpenOptions,
@ -13,6 +15,7 @@ use tokio::{
time::Duration,
io::BufReader,
};
use ipfilt::IpFilter;
pub const DEFAULT_FILE_LOCATION: &'static str = "markov.toml";
@ -26,20 +29,70 @@ pub struct Config
pub save_interval_secs: Option<NonZeroU64>,
pub trust_x_forwarded_for: bool,
#[serde(default)]
pub feed_bounds: String,
#[serde(default)]
pub filter: FilterConfig,
#[serde(default)]
pub writer: WriterConfig,
#[serde(default)]
pub mask: IpFilter,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct FilterConfig
{
#[serde(default)]
inbound: String,
#[serde(default)]
outbound: String,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct WriterConfig
{
pub backlog: usize,
pub internal_backlog: usize,
pub capacity: usize,
pub timeout_ms: Option<u64>,
pub throttle_ms: Option<u64>,
}
impl Default for WriterConfig
{
#[inline]
fn default() -> Self
{
Self {
backlog: 32,
internal_backlog: 8,
capacity: 4,
timeout_ms: None,
throttle_ms: None,
}
}
}
impl WriterConfig
{
fn create_settings(self, bounds: range::DynRange<usize>) -> handle::Settings
{
handle::Settings{
backlog: self.backlog,
internal_backlog: self.internal_backlog,
capacity: self.capacity,
timeout: self.timeout_ms.map(tokio::time::Duration::from_millis).unwrap_or(handle::DEFAULT_TIMEOUT),
throttle: self.throttle_ms.map(tokio::time::Duration::from_millis),
bounds,
}
}
}
impl FilterConfig
{
pub fn get_inbound_filter(&self) -> sanitise::filter::Filter
fn get_inbound_filter(&self) -> sanitise::filter::Filter
{
let filt: sanitise::filter::Filter = self.inbound.parse().unwrap();
if !filt.is_empty()
@ -48,7 +101,7 @@ impl FilterConfig
}
filt
}
pub fn get_outbound_filter(&self) -> sanitise::filter::Filter
fn get_outbound_filter(&self) -> sanitise::filter::Filter
{
let filt: sanitise::filter::Filter = self.outbound.parse().unwrap();
if !filt.is_empty()
@ -72,12 +125,49 @@ impl Default for Config
save_interval_secs: Some(unsafe{NonZeroU64::new_unchecked(2)}),
trust_x_forwarded_for: false,
filter: Default::default(),
feed_bounds: "2..".to_owned(),
writer: Default::default(),
mask: Default::default(),
}
}
}
impl Config
{
/// Try to generate a config cache for this instance.
pub fn try_gen_cache(&self) -> Result<Cache, InvalidConfigError>
{
macro_rules! section {
($name:literal, $expr:expr) => {
match $expr {
Ok(v) => Ok(v),
Err(e) => Err(InvalidConfigError($name, Box::new(e))),
}
}
}
use std::ops::RangeBounds;
let feed_bounds = section!("feed_bounds", self.parse_feed_bounds()).and_then(|bounds| if bounds.contains(&0) {
Err(InvalidConfigError("feed_bounds", Box::new(opaque_error!("Bounds not allowed to contains 0 (they were `{}`)", bounds))))
} else {
Ok(bounds)
})?;
Ok(Cache {
handler_settings: self.writer.create_settings(feed_bounds.clone()),
feed_bounds,
inbound_filter: self.filter.get_inbound_filter(),
outbound_filter: self.filter.get_outbound_filter(),
})
}
/// Try to parse the `feed_bounds`
fn parse_feed_bounds(&self) -> Result<range::DynRange<usize>, range::ParseError>
{
if self.feed_bounds.len() == 0 {
Ok(feed::DEFAULT_FEED_BOUNDS.into())
} else {
self.feed_bounds.parse()
}
}
pub fn save_interval(&self) -> Option<Duration>
{
self.save_interval_secs.map(|x| Duration::from_secs(x.into()))
@ -139,3 +229,54 @@ async fn load_args<I: Iterator<Item=String>>(mut from: I) -> Option<Config>
},
}
}
#[derive(Debug)]
pub struct InvalidConfigError(&'static str, Box<dyn error::Error+ 'static>);
impl InvalidConfigError
{
pub fn field(&self) -> &str
{
&self.0[..]
}
}
impl error::Error for InvalidConfigError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(self.1.as_ref())
}
}
impl fmt::Display for InvalidConfigError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f,"failed to parse field `{}`: {}", self.0, self.1)
}
}
/// Caches some parsed config arguments
#[derive(Clone, PartialEq)]
pub struct Cache
{
pub feed_bounds: range::DynRange<usize>,
pub inbound_filter: sanitise::filter::Filter,
pub outbound_filter: sanitise::filter::Filter,
pub handler_settings: handle::Settings,
}
impl fmt::Debug for Cache
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
f.debug_struct("Cache")
.field("feed_bounds", &self.feed_bounds)
.field("inbound_filter", &self.inbound_filter.iter().collect::<String>())
.field("outbound_filter", &self.outbound_filter.iter().collect::<String>())
.field("handler_settings", &self.handler_settings)
.finish()
}
}

@ -1,4 +1,5 @@
//! Extensions
use super::*;
use std::{
iter,
ops::{
@ -137,6 +138,18 @@ impl<T> AssertNotSend<T>
t
}
/// Require a value implements a specific trait
#[macro_export] macro_rules! require_impl {
($t:path: $val:expr) => {
{
#[inline(always)] fn require_impl<T: $t >(val: T) -> T {
val
}
require_impl($val)
}
}
}
impl<T> Deref for AssertNotSend<T>
{
type Target = T;
@ -150,3 +163,21 @@ impl<T> DerefMut for AssertNotSend<T>
&mut self.0
}
}
pub trait ChunkStreamExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>;
fn chunk(self, sz: usize) -> chunking::ChunkingStream<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkStreamExt<T> for S
where S: Stream<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>
{
chunking::ChunkingStream::new(self, sz)
}
}

@ -1,37 +1,69 @@
//! Feeding the chain
use super::*;
#[cfg(any(feature="feed-sentance", feature="split-sentance"))]
use sanitise::Sentance;
#[allow(unused_imports)]
use futures::stream;
const FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..; //TODO: Add to config somehow
pub const DEFAULT_FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..;
/// Feed `what` into `chain`, at least `bounds` tokens.
///
/// # Tokenising
/// How the tokens are split within this function that operates on single buffers is determined largely by the features `split-sentance` and `feed-sentance` determining the use of the sentance API.
///
/// ## Pipeline
/// Since this is called on single buffers, it happens after the `split-newlines` tokenising if it's enabled, and thus the sentance API is only able to operate on each seperate line if that feature is enabled, regardless of `always-aggre`, or `feed-sentance` or `split-sentance`.
/// This is the pipeline for just within this function, after splitting through newlines if enabled.
///
/// * `feed-sentance`
/// ** Feed the buffer through the sentance split tokeniser
/// ** Feed the sentances through the word split tokeniser
/// ** Feed each collection of words into the chain seperately
/// * `split-sentance`
/// ** Feed the buffer through the sentance split tokeniser
/// ** Feed the sentances through the word split tokeniser
/// ** Feed the flattened collection into the chain once, concatenated.
/// * Neither
/// ** Feed the buffer through the word split tokeniser
/// ** Feed the collection into the chain
pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>, bounds: impl std::ops::RangeBounds<usize>)
{
cfg_if! {
if #[cfg(feature="split-sentance")] {
if #[cfg(feature="feed-sentance")] {
let map = Sentance::new_iter(&what) //get each sentance in string
.map(|what| what.words()
.map(|s| s.to_owned()).collect::<Vec<_>>());
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
for map in map {// feed each sentance seperately
if bounds.contains(&map.len()) {
debug!("Feeding chain {} items", map.len());
chain.feed(map);
}
else {
debug!("Ignoring feed of invalid length {}", map.len());
debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map);
}
}
} else {
let map = Sentance::new_iter(&what) //get each sentance in string
.map(|what| what.words())
.flatten() // add all into one buffer
.map(|s| s.to_owned()).collect::<Vec<_>>();
cfg_if!{
if #[cfg(feature="split-sentance")] {
let map = Sentance::new_iter(&what) //get each sentance in string
.map(|what| what.words())
.flatten() // add all into one buffer
.map(|s| s.to_owned()).collect::<Vec<_>>();
} else {
let map: Vec<_> = sanitise::words(what.as_ref()).map(ToOwned::to_owned)
.collect();
}
}
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
if bounds.contains(&map.len()) {
//debug!("Feeding chain {} items", map.len());
chain.feed(map);
}
else {
debug!("Ignoring feed of invalid length {}", map.len());
debug!("Ignoring feed of invalid length {}: {:?}", map.len(), map);
}
}
@ -44,11 +76,12 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
if_debug! {
let timer = std::time::Instant::now();
}
//let bounds = &state.config_cache().feed_bounds;
macro_rules! feed {
($chain:expr, $buffer:ident, $bounds:expr) => {
($buffer:expr) => {
{
let buffer = $buffer;
feed($chain, &buffer, $bounds)
state.chain_write(buffer).await.map_err(|_| FillBodyError)?;
}
}
}
@ -71,45 +104,42 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
let buffer = state.inbound_filter().filter_cow(buffer);
info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await;
cfg_if! {
if #[cfg(feature="split-newlines")] {
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
feed!(&mut chain, buffer, FEED_BOUNDS);
}
feed!(stream::iter(buffer.split('\n').filter(|line| !line.trim().is_empty())
.map(|x| x.to_owned())))
} else {
feed!(&mut chain, buffer, FEED_BOUNDS);
feed!(stream::once(async move{buffer.into_owned()}));
}
}
} else {
use tokio::prelude::*;
let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok()));
let mut lines = reader.lines();
#[cfg(feature="hog-buffer")]
let mut chain = state.chain().write().await;
while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? {
let lines = reader.lines();
feed!(lines.filter_map(|x| x.ok().and_then(|line| {
let line = state.inbound_filter().filter_cow(&line);
let line = line.trim();
if !line.is_empty() {
#[cfg(not(feature="hog-buffer"))]
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
feed!(&mut chain, line, FEED_BOUNDS);
//#[cfg(not(feature="hog-buffer"))]
//let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
info!("{} -> {:?}", who, line);
written+=line.len();
Some(line.to_owned())
} else {
None
}
written+=line.len();
}
})));
}
}
if_debug!{
if_debug! {
trace!("Write took {}ms", timer.elapsed().as_millis());
}
state.notify_save();
Ok(written)
}

@ -1,34 +1,46 @@
//! Generating the strings
use super::*;
use tokio::sync::mpsc::error::SendError;
use futures::StreamExt;
#[derive(Debug)]
pub struct GenBodyError(pub String);
#[derive(Debug, Default)]
pub struct GenBodyError(Option<String>);
impl error::Error for GenBodyError{}
impl fmt::Display for GenBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to write {:?} to body", self.0)
if let Some(z) = &self.0 {
write!(f, "failed to write read string {:?} to body", z)
} else {
write!(f, "failed to read string from chain. it might be empty.")
}
}
}
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), GenBodyError>
{
let chain = state.chain().read().await;
if !chain.is_empty() {
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS `full_body` and writes, potentially.
for string in chain.str_iter_for(num) {
output.send(filter.filter_owned(string)).await.map_err(|e| GenBodyError(e.0))?;
}
},
_ => output.send(filter.filter_owned(chain.generate_str())).await.map_err(|e| GenBodyError(e.0))?,
}
let mut chain = state.chain_read();
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
let mut chain = chain.take(num);
while let Some(string) = chain.next().await {
output.send(filter.filter_owned(string)).await?;
}
},
_ => output.send(filter.filter_owned(chain.next().await.ok_or_else(GenBodyError::default)?)).await?,
}
Ok(())
}
impl From<SendError<String>> for GenBodyError
{
#[inline] fn from(from: SendError<String>) -> Self
{
Self(Some(from.0))
}
}

@ -0,0 +1,392 @@
//! Chain handler.
use super::*;
use std::{
marker::Send,
sync::Weak,
num::NonZeroUsize,
task::{Poll, Context,},
pin::Pin,
};
use tokio::{
sync::{
RwLock,
RwLockReadGuard,
mpsc::{
self,
error::SendError,
},
watch,
Notify,
},
task::JoinHandle,
time::{
self,
Duration,
},
};
use futures::StreamExt;
pub const DEFAULT_TIMEOUT: Duration= Duration::from_secs(5);
/// Settings for chain handler
#[derive(Debug, Clone, PartialEq)]
pub struct Settings
{
pub backlog: usize,
pub internal_backlog: usize,
pub capacity: usize,
pub timeout: Duration,
pub throttle: Option<Duration>,
pub bounds: range::DynRange<usize>,
}
impl Settings
{
/// Should we keep this string.
#[inline] fn matches(&self, _s: &str) -> bool
{
true
}
}
impl Default for Settings
{
#[inline]
fn default() -> Self
{
Self {
backlog: 32,
internal_backlog: 8,
capacity: 4,
timeout: Duration::from_secs(5),
throttle: Some(Duration::from_millis(200)),
bounds: feed::DEFAULT_FEED_BOUNDS.into(),
}
}
}
#[derive(Debug)]
struct HostInner<T>
{
input: mpsc::Receiver<Vec<T>>,
shutdown: watch::Receiver<bool>,
}
#[derive(Debug)]
struct Handle<T: Send+ chain::Chainable>
{
chain: RwLock<chain::Chain<T>>,
input: mpsc::Sender<Vec<T>>,
opt: Settings,
notify_write: Arc<Notify>,
push_now: Arc<Notify>,
shutdown: watch::Sender<bool>,
/// Data used only for the worker task.
host: msg::Once<HostInner<T>>,
}
#[derive(Clone, Debug)]
pub struct ChainHandle<T: Send + chain::Chainable>(Arc<Box<Handle<T>>>);
impl<T: Send+ chain::Chainable + 'static> ChainHandle<T>
{
pub fn with_settings(chain: chain::Chain<T>, opt: Settings) -> Self
{
let (shutdown_tx, shutdown) = watch::channel(false);
let (itx, irx) = mpsc::channel(opt.backlog);
Self(Arc::new(Box::new(Handle{
chain: RwLock::new(chain),
input: itx,
opt,
push_now: Arc::new(Notify::new()),
notify_write: Arc::new(Notify::new()),
shutdown: shutdown_tx,
host: msg::Once::new(HostInner{
input: irx,
shutdown,
})
})))
}
/// Acquire the chain read lock
async fn chain(&self) -> RwLockReadGuard<'_, chain::Chain<T>>
{
self.0.chain.read().await
}
/// A reference to the chain
pub fn chain_ref(&self) -> &RwLock<chain::Chain<T>>
{
&self.0.chain
}
/// Create a stream that reads generated values forever.
pub fn read(&self) -> ChainStream<T>
{
ChainStream{
chain: Arc::downgrade(&self.0),
buffer: Vec::with_capacity(self.0.opt.backlog),
}
}
/// Send this buffer to the chain
pub fn write(&self, buf: Vec<T>) -> impl futures::Future<Output = Result<(), SendError<Vec<T>>>> + 'static
{
let mut write = self.0.input.clone();
async move {
write.send(buf).await
}
}
/// Send this stream buffer to the chain
pub fn write_stream<'a, I: Stream<Item=T>>(&self, buf: I) -> impl futures::Future<Output = Result<(), SendError<Vec<T>>>> + 'a
where I: 'a
{
let mut write = self.0.input.clone();
async move {
write.send(buf.collect().await).await
}
}
/// Send this buffer to the chain
pub async fn write_in_place(&self, buf: Vec<T>) -> Result<(), SendError<Vec<T>>>
{
self.0.input.clone().send(buf).await
}
/// A referencer for the notifier
pub fn notify_when(&self) -> &Arc<Notify>
{
&self.0.notify_write
}
/// Force the pending buffers to be written to the chain now
pub fn push_now(&self)
{
self.0.push_now.notify();
}
/// Hang the worker thread, preventing it from taking any more inputs and also flushing it.
///
/// # Panics
/// If there was no worker thread.
pub fn hang(&self)
{
trace!("Communicating hang request");
self.0.shutdown.broadcast(true).expect("Failed to communicate hang");
}
}
impl ChainHandle<String>
{
#[deprecated = "use read() pls"]
pub async fn generate_body(&self, state: &state::State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), SendError<String>>
{
let chain = self.chain().await;
if !chain.is_empty() {
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS writes, potentially.
for string in chain.str_iter_for(num) {
output.send(filter.filter_owned(string)).await?;
}
},
_ => output.send(filter.filter_owned(chain.generate_str())).await?,
}
}
Ok(())
}
}
/// Host this handle on the current task.
///
/// # Panics
/// If `from` has already been hosted.
pub async fn host(from: ChainHandle<String>)
{
let opt = from.0.opt.clone();
let mut data = from.0.host.unwrap().await;
let (mut tx, mut child) = {
// The `real` input channel.
let from = from.clone();
let opt = opt.clone();
let (tx, rx) = mpsc::channel::<Vec<Vec<_>>>(opt.internal_backlog);
(tx, tokio::spawn(async move {
let mut rx = if let Some(thr) = opt.throttle {
time::throttle(thr, rx).boxed()
} else {
rx.boxed()
};
trace!("child: Begin waiting on parent");
while let Some(item) = rx.next().await {
if item.len() > 0 {
info!("Write lock acq");
let mut lock = from.0.chain.write().await;
for item in item.into_iter()
{
use std::ops::DerefMut;
for item in item.into_iter() {
feed::feed(lock.deref_mut(), item, &from.0.opt.bounds);
}
}
trace!("Signalling write");
from.0.notify_write.notify();
}
}
trace!("child: exiting");
}))
};
trace!("Begin polling on child");
tokio::select!{
v = &mut child => {
match v {
/*#[cold]*/ Ok(_) => {warn!("Child exited before we have? This should probably never happen.")},//Should never happen.
Err(e) => {error!("Child exited abnormally. Aborting: {}", e)}, //Child panic or cancel.
}
},
_ = async move {
let mut rx = data.input.chunk(opt.capacity); //we don't even need this tbh, oh well.
if !data.shutdown.recv().await.unwrap_or(true) { //first shutdown we get for free
while Arc::strong_count(&from.0) > 2 {
if *data.shutdown.borrow() {
break;
}
tokio::select!{
Some(true) = data.shutdown.recv() => {
debug!("Got shutdown (hang) request. Sending now then breaking");
let mut rest = {
let irx = rx.get_mut();
irx.close(); //accept no more inputs
let mut output = Vec::with_capacity(opt.capacity);
while let Ok(item) = irx.try_recv() {
output.push(item);
}
output
};
rest.extend(rx.take_now());
if rest.len() > 0 {
if let Err(err) = tx.send(rest).await {
error!("Failed to force send buffer, exiting now: {}", err);
}
}
break;
}
_ = time::delay_for(opt.timeout) => {
trace!("Setting push now");
rx.push_now();
}
_ = from.0.push_now.notified() => {
debug!("Got force push signal");
let take =rx.take_now();
rx.push_now();
if take.len() > 0 {
if let Err(err) = tx.send(take).await {
error!("Failed to force send buffer: {}", err);
break;
}
}
}
Some(buffer) = rx.next() => {
debug!("Sending {} (cap {})", buffer.len(), buffer.capacity());
if let Err(err) = tx.send(buffer).await {
// Receive closed?
//
// This probably shouldn't happen, as we `select!` for it up there and child never calls `close()` on `rx`.
// In any case, it means we should abort.
/*#[cold]*/ error!("Failed to send buffer: {}", err);
break;
}
}
}
}
}
let last = rx.into_buffer();
if last.len() > 0 {
if let Err(err) = tx.send(last).await {
error!("Failed to force send last part of buffer: {}", err);
} else {
trace!("Sent rest of buffer");
}
}
} => {
// Normal exit
trace!("Normal exit")
},
}
trace!("Waiting on child");
// No more handles except child, no more possible inputs.
child.await.expect("Child panic");
trace!("Returning");
}
/// Spawn a new chain handler for this chain.
pub fn spawn(from: chain::Chain<String>, opt: Settings) -> (JoinHandle<()>, ChainHandle<String>)
{
debug!("Spawning with opt: {:?}", opt);
let handle = ChainHandle::with_settings(from, opt);
(tokio::spawn(host(handle.clone())), handle)
}
#[derive(Debug)]
pub struct ChainStream<T: Send + chain::Chainable>
{
chain: Weak<Box<Handle<T>>>,
buffer: Vec<T>,
}
impl ChainStream<String>
{
async fn try_pull(&mut self, n: usize) -> Option<NonZeroUsize>
{
if n == 0 {
return None;
}
if let Some(read) = self.chain.upgrade() {
let chain = read.chain.read().await;
if chain.is_empty() {
return None;
}
let n = if n == 1 {
self.buffer.push(chain.generate_str());
1
} else {
self.buffer.extend(chain.str_iter_for(n));
n //for now
};
Some(unsafe{NonZeroUsize::new_unchecked(n)})
} else {
None
}
}
}
impl Stream for ChainStream<String>
{
type Item = String;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use futures::Future;
let this = self.get_mut();
if this.buffer.len() == 0 {
let pull = this.try_pull(this.buffer.capacity());
tokio::pin!(pull);
match pull.poll(cx) {
Poll::Ready(Some(_)) => {},
Poll::Pending => return Poll::Pending,
_ => return Poll::Ready(None),
};
}
debug_assert!(this.buffer.len()>0);
Poll::Ready(Some(this.buffer.remove(0)))
}
}

@ -0,0 +1,181 @@
//! Filter accepts and denies based on cidr masks.
use super::*;
use cidr::{
Cidr,
IpCidr,
};
use std::{
net::{
IpAddr,
},
error,
fmt,
};
#[derive(Debug)]
pub struct IpFilterDeniedError(IpAddr, Option<IpCidr>);
impl warp::reject::Reject for IpFilterDeniedError{}
impl error::Error for IpFilterDeniedError{}
impl fmt::Display for IpFilterDeniedError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Denied {} due to ", self.0)?;
match &self.1 {
Some(cidr) => write!(f, "matching rule {}", cidr),
None => write!(f, "non-matching accept rule"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Rule
{
Accept,
Deny,
}
impl Default for Rule
{
#[inline]
fn default() -> Self
{
Self::Deny
}
}
impl Rule
{
fn into_result<'a>(self, net: Option<&'a IpCidr>) -> Result<Option<&'a IpCidr>, Option<IpCidr>>
{
if let Self::Accept = self {
Ok(net)
} else {
Err(net.cloned())
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct IpFilter
{
/// The default fallback rule
pub default: Rule,
#[serde(default)]
accept: Vec<IpCidr>,
#[serde(default)]
deny: Vec<IpCidr>,
}
#[inline] fn find_in<'a>(needle: &IpAddr, haystack: &'a [IpCidr]) -> Option<&'a IpCidr>
{
for x in haystack.iter()
{
if x.contains(needle) {
return Some(x);
}
}
None
}
impl Default for IpFilter
{
#[inline]
fn default() -> Self
{
Self {
default: Rule::Deny,
accept: vec![cidr::Cidr::new_host([127,0,0,1].into())],
deny: Vec::default(),
}
}
}
impl IpFilter
{
/// Create a new CIDR filter with thie default rule.
///
/// Use `default()` to use with default rule.
pub fn new(fallback: Rule) -> Self
{
Self {
default: fallback,
accept: Vec::new(),
deny: Vec::new(),
}
}
/// Checks the rule for this IP, returns a result if it should accept or not.
///
/// If acceptance rule is met, return the CIDR match that caused the acceptance if applicable
///
/// If acceptance rule is not met, return in the error which CIDR match cause the deny if applicable
pub fn check(&self, ip: &IpAddr) -> Result<Option<&'_ IpCidr>, IpFilterDeniedError>
{
let accept = find_in(ip, &self.accept[..]);
let deny = find_in(ip, &self.deny[..]);
let (rule, cidr) = match (accept, deny) {
(None, Some(net)) => (Rule::Deny, Some(net)),
(Some(net), None) => (Rule::Accept, Some(net)),
(Some(ac), Some(den)) if ac != den => {
if ac.network_length() > den.network_length() {
(Rule::Accept, Some(ac))
} else {
(Rule::Deny, Some(den))
}
},
_ => (self.default, None)
};
rule.into_result(cidr)
.map_err(|cidr| IpFilterDeniedError(*ip, cidr))
}
pub fn accept_mask(&self) -> &[IpCidr]
{
&self.accept[..]
}
pub fn deny_mask(&self) -> &[IpCidr]
{
&self.deny[..]
}
pub fn accept_range(&mut self, items: impl IntoIterator<Item = IpCidr>)
{
self.accept.extend(items)
}
pub fn deny_range(&mut self, items: impl IntoIterator<Item = IpCidr>)
{
self.deny.extend(items)
}
pub fn accept_one(&mut self, item: IpCidr)
{
self.accept.push(item)
}
pub fn deny_one(&mut self, items: IpCidr)
{
self.deny.push(items)
}
/// Can any connection ever be accepted?
pub fn possible(&self) -> bool
{
//TODO: Test this
!(self.default == Rule::Deny && self.accept.len() == 0) &&
!(self.deny.iter().find(|x| x.network_length() == 0).is_some() && self.accept.len() == 0)
}
}
pub async fn recover(err: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection>
{
if let Some(t) = err.find::<IpFilterDeniedError>() {
error!("Denying access to {} because of {:?} (403)", t.0, t.1);
Ok(warp::http::Response::builder()
.status(status!(403))
.body(format!("Access denied: {}", t)))
} else {
Err(err)
}
}

@ -1,5 +1,3 @@
#![feature(split_inclusive)]
#![allow(dead_code)]
#[macro_use] extern crate log;
@ -63,6 +61,7 @@ macro_rules! status {
mod ext;
use ext::*;
mod util;
mod range;
mod sanitise;
mod bytes;
mod chunking;
@ -71,11 +70,14 @@ mod api;
#[cfg(target_family="unix")]
mod signals;
mod config;
mod msg;
mod state;
use state::State;
mod save;
mod ipfilt;
mod forwarded_list;
use forwarded_list::XForwardedFor;
mod handle;
mod feed;
mod gen;
@ -104,8 +106,19 @@ fn init_log()
async fn main() {
init_log();
let config = match config::load().await {
Some(v) => v,
let (config, ccache) = match config::load().await {
Some(v) => {
let cache = match v.try_gen_cache() {
Ok(c) => c,
Err(e) => {
error!("Invalid config, cannot continue");
error!("{}", e);
debug!("{:?}", e);
return;
},
};
(v, cache)
},
_ => {
let cfg = config::Config::default();
#[cfg(debug_assertions)]
@ -114,12 +127,14 @@ async fn main() {
error!("Failed to create default config file: {}", err);
}
}
cfg
let cache= cfg.try_gen_cache().unwrap();
(cfg, cache)
},
};
trace!("Using config {:?}", config);
debug!("Using config {:?}", config);
trace!("With config cached: {:?}", ccache);
let chain = Arc::new(RwLock::new(match save::load(&config.file).await {
let (chain_handle, chain) = handle::spawn(match save::load(&config.file).await {
Ok(chain) => {
info!("Loaded chain from {:?}", config.file);
chain
@ -129,17 +144,17 @@ async fn main() {
trace!("Error: {}", e);
Chain::new()
},
}));
}, ccache.handler_settings.clone());
{
let mut tasks = Vec::<BoxFuture<'static, ()>>::new();
tasks.push(chain_handle.map(|res| res.expect("Chain handle panicked")).boxed());
let (state, chain) = {
let save_when = Arc::new(Notify::new());
let state = State::new(config,
Arc::clone(&chain),
Arc::clone(&save_when));
ccache,
chain);
let state2 = state.clone();
let saver = tokio::spawn(save::host(state.clone()));
let saver = tokio::spawn(save::host(Box::new(state.clone())));
let chain = warp::any().map(move || state.clone());
tasks.push(saver.map(|res| res.expect("Saver panicked")).boxed());
@ -156,11 +171,25 @@ async fn main() {
} else {
warp::filters::addr::remote().and_then(|x: Option<SocketAddr>| async move {x.map(|x| x.ip()).ok_or_else(|| warp::reject::not_found())}).boxed()
};
let ipfilter = warp::any()
.and(chain)
.and(client_ip)
.and_then(|state: State, host: IpAddr| {
async move {
state.config().mask.check(&host)
.map(|ci| {
trace!("Accepting from rule {:?}", ci);
host
})
.map(move |host| (state, host))
.map_err(warp::reject::custom)
}
}).untuple_one();
let push = warp::put()
.and(warp::path("put"))
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::stream())
.and_then(|state: State, host: IpAddr, buf| {
@ -170,6 +199,8 @@ async fn main() {
.map_err(|_| warp::reject::not_found()) //(warp::reject::custom) //TODO: Recover rejection filter down below for custom error return
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::put"));
@ -179,8 +210,8 @@ async fn main() {
let single = {
let msz = state.config().max_gen_size;
warp::post()
.and(ipfilter.clone())
.and(warp::path("single"))
.and(client_ip.clone())
.and(warp::path::param()
.map(move |sz: usize| {
if sz == 0 || (2..=msz).contains(&sz) {
@ -193,11 +224,13 @@ async fn main() {
.unify())
.and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::aggregate())
.map(|_, x, y, z| (x,y,z)).untuple_one()
.and_then(api::single)
.with(warp::log("markov::api::single"))
};
warp::path("api")
.and(single)
.recover(ipfilt::recover)
.recover(api::error::rejection)
};
}
@ -205,8 +238,7 @@ async fn main() {
let read = warp::get()
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| {
@ -224,12 +256,12 @@ async fn main() {
}))))
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::read"));
let sentance = warp::get()
.and(warp::path("sentance")) //TODO: sanitise::Sentance::new_iter the body line
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| {
@ -247,6 +279,7 @@ async fn main() {
}))))
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::read::sentance"));
let read = warp::path("get").and(read.or(sentance));
@ -256,8 +289,8 @@ async fn main() {
#[cfg(target_family="unix")]
tasks.push(tokio::spawn(signals::handle(state.clone())).map(|res| res.expect("Signal handler panicked")).boxed());
require_send(async {
let server = {
require_impl!(Send: async {
let (server, init) = {
let s2 = AssertNotSend::new(state.clone()); //temp clone the Arcs here for shutdown if server fails to bind, assert they cannot remain cloned across an await boundary.
match bind::try_serve(warp::serve(push
.or(read)),
@ -268,7 +301,7 @@ async fn main() {
}) {
Ok((addr, server)) => {
info!("Server bound on {:?}", addr);
server
(server, s2.into_inner().into_initialiser())
},
Err(err) => {
error!("Failed to bind server: {}", err);
@ -277,13 +310,27 @@ async fn main() {
},
}
};
server.await;
tokio::join![
server,
async move {
cfg_if! {
if #[cfg(feature="instant-init")] {
trace!("Setting init");
} else {
trace!("Setting init in 2 seconds for good measure.");
tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
}
}
init.set().expect("Failed to initialise saver")
},
];
}).await;
// Cleanup
async move {
trace!("Cleanup");
debug!("Waiting on {} tasks now", tasks.len());
join_all(tasks).await;
}
}.await;

@ -0,0 +1,210 @@
//! Message passing things
use super::*;
use tokio::{
sync::{
watch,
Mutex,
},
};
use std::{
task::{Poll, Context},
pin::Pin,
fmt,
error,
};
use futures::{
future::{
Future,
},
};
#[derive(Debug)]
pub struct InitError;
#[derive(Debug)]
pub struct InitWaitError;
impl error::Error for InitError{}
impl fmt::Display for InitError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to set init value")
}
}
impl error::Error for InitWaitError{}
impl fmt::Display for InitWaitError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to receive init value")
}
}
/// Provides a method of waiting on and setting a single initialisation.
///
/// In general, it should only be set once, as multiple sets do nothing but hog `Arc`s.
/// Dropping the `Initialiser` after waiting or setting should generally be done immediately.
/// Choose the `into_wait()` and `set()` varients over the non-consuming ones.
#[derive(Clone, Debug)]
pub struct Initialiser
{
tx: Arc<watch::Sender<bool>>,
rx: watch::Receiver<bool>
}
impl Initialiser
{
/// Create a new, unset initialiser
pub fn new() -> Self
{
let (tx, rx) = watch::channel(false);
Self {
tx: Arc::new(tx),
rx,
}
}
/// Create a pre-set initialiser. Calls to `wait()` will immediately resolve.
pub fn new_set() -> Self
{
let (tx, rx) = watch::channel(true);
Self {
tx: Arc::new(tx),
rx,
}
}
/// Consume into a future that completes when init is set.
pub fn into_wait(self) -> impl Future<Output=Result<(), InitWaitError>> + 'static
{
let mut rx = self.rx;
async move {
if !*rx.borrow() {
while !rx.recv().await.ok_or_else(|| InitWaitError)? {
//tokio::task::yield_now().await;
}
Ok(())
} else {
Ok(())
}
}
}
/// Clone into a future that completes when init is set.
///
/// This method does not clone any `Arc`s and is prefered to `self.clone().into_wait()`.
/// Use this when the `Initialiser` you want to wait on is behind a shared reference.
pub fn clone_into_wait(&self) -> impl Future<Output=Result<(), InitWaitError>> + 'static
{
let mut rx = self.rx.clone();
async move {
if !*rx.borrow() {
while !rx.recv().await.ok_or_else(|| InitWaitError)? {
//tokio::task::yield_now().await;
}
Ok(())
} else {
Ok(())
}
}
}
/// Completes when init is set
pub async fn wait(&mut self) -> Result<(), InitWaitError>
{
if !*self.rx.borrow() {
while !self.rx.recv().await.ok_or_else(|| InitWaitError)? {
//tokio::task::yield_now().await;
}
Ok(())
} else {
Ok(())
}
}
/// Is init set?
pub fn is_set(&self) -> bool
{
*self.rx.borrow()
}
/// Consume and set init if it's not already set
pub fn set(self) -> Result<(), InitError>
{
if !*self.rx.borrow() {
self.tx.broadcast(true).map_err(|_| InitError)
} else {
Ok(())
}
}
/// Set init without consuming.
///
/// # Note
/// It is prefered to use `set()`, as this method may make `Arc`s hang around longer than they need to.
/// Calling this multiple times is useless.
pub fn set_in_place(&self) -> Result<(), InitError>
{
if !*self.rx.borrow() {
self.tx.broadcast(true).map_err(|_| InitError)
} else {
Ok(())
}
}
}
impl Future for Initialiser
{
type Output = Result<(), InitWaitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let uhh = self.wait();
tokio::pin!(uhh);
uhh.poll(cx)
}
}
/// A value that can be consumed once.
#[derive(Debug)]
pub struct Once<T>(Mutex<Option<T>>);
impl<T> Once<T>
{
/// Create a new instance
pub fn new(from: T) -> Self
{
Self(Mutex::new(Some(from)))
}
/// Consume into the instance from behind a potentially shared reference.
pub async fn consume_shared(self: Arc<Self>) -> Option<T>
{
match Arc::try_unwrap(self) {
Ok(x) => x.0.into_inner(),
Err(x) => x.0.lock().await.take(),
}
}
/// Consume from a shared reference and panic if the value has already been consumed.
pub async fn unwrap_shared(self: Arc<Self>) -> T
{
self.consume_shared().await.unwrap()
}
/// Consume into the instance.
pub async fn consume(&self) -> Option<T>
{
self.0.lock().await.take()
}
/// Consume and panic if the value has already been consumed.
pub async fn unwrap(&self) -> T
{
self.consume().await.unwrap()
}
/// Consume into the inner value
pub fn into_inner(self) -> Option<T>
{
self.0.into_inner()
}
}

@ -0,0 +1,296 @@
//! Workarounds for ridiculously janky `std::ops::Range*` polymorphism
use super::*;
use std::{
ops::{
Range,
RangeFrom,
RangeInclusive,
RangeTo,
RangeToInclusive,
RangeFull,
Bound,
RangeBounds,
},
str::{
FromStr,
},
fmt,
error,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DynRange<T>
{
Range(Range<T>),
From(RangeFrom<T>),
Inclusive(RangeInclusive<T>),
To(RangeTo<T>),
ToInclusive(RangeToInclusive<T>),
Full(RangeFull),
}
#[macro_export] macro_rules! impl_from {
(Full, RangeFull) => {
impl<T> From<RangeFull> for DynRange<T>
{
#[inline] fn from(from: RangeFull) -> Self
{
Self::Full(from)
}
}
};
($name:ident, $range:tt) => {
impl<T> From<$range <T>> for DynRange<T>
{
#[inline] fn from(from: $range<T>) -> Self
{
Self::$name(from)
}
}
};
}
impl_from!(Range, Range);
impl_from!(From, RangeFrom);
impl_from!(Inclusive, RangeInclusive);
impl_from!(To, RangeTo);
impl_from!(ToInclusive, RangeToInclusive);
impl_from!(Full, RangeFull);
macro_rules! bounds {
($self:ident, $bound:ident) => {
match $self {
DynRange::Range(from) => from.$bound(),
DynRange::From(from) => from.$bound(),
DynRange::Inclusive(i) => i.$bound(),
DynRange::To(i) => i.$bound(),
DynRange::ToInclusive(i) => i.$bound(),
DynRange::Full(_) => (..).$bound(),
}
};
}
impl<T> RangeBounds<T> for DynRange<T>
{
fn start_bound(&self) -> Bound<&T> {
bounds!(self, start_bound)
}
fn end_bound(&self) -> Bound<&T> {
bounds!(self, end_bound)
}
}
impl<'a, T> RangeBounds<T> for &'a DynRange<T>
{
fn start_bound(&self) -> Bound<&T> {
bounds!(self, start_bound)
}
fn end_bound(&self) -> Bound<&T> {
bounds!(self, end_bound)
}
}
impl<T: fmt::Display> fmt::Display for DynRange<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Range(from) => write!(f, "{}..{}", from.start, from.end),
Self::From(from) => write!(f, "{}..", from.start),
Self::Inclusive(from) => write!(f, "{}..={}", from.start(), from.end()),
Self::To(from) => write!(f, "..{}", from.end),
Self::ToInclusive(from) => write!(f, "..={}", from.end),
Self::Full(_) => write!(f, ".."),
}
}
}
use std::any::{
Any,
};
impl<T: 'static> DynRange<T>
{
#[inline]
pub fn into_boxed(self) -> Box<dyn Any /*TODO: + Send + Sync */+ 'static>
{
self.into_inner()
}
fn into_inner(self) -> Box<dyn Any + 'static>
{
match self {
Self::Range(from) => Box::new(from),
Self::From(from) => Box::new(from),
Self::Inclusive(from) => Box::new(from),
Self::To(from) => Box::new(from),
Self::ToInclusive(from) => Box::new(from),
Self::Full(_) => Box::new(..),
}
}
fn inner_mut(&mut self) -> &mut dyn Any
{
match self {
Self::Range(from) => from,
Self::From(from) => from,
Self::Inclusive(from) => from,
Self::To(from) => from,
Self::ToInclusive(from) => from,
Self::Full(f) => f,
}
}
fn inner_ref(&self) -> &dyn Any
{
match self {
Self::Range(from) => from,
Self::From(from) => from,
Self::Inclusive(from) => from,
Self::To(from) => from,
Self::ToInclusive(from) => from,
Self::Full(_) => &(..),
}
}
pub fn downcast_ref<R: RangeBounds<T> + 'static>(&self) -> Option<&R>
{
self.inner_ref().downcast_ref()
}
pub fn downcast_mut<R: RangeBounds<T> + 'static>(&mut self) -> Option<&mut R>
{
self.inner_mut().downcast_mut()
}
pub fn downcast<R: RangeBounds<T> + 'static>(self) -> Result<R, Self>
{
self.into_inner().downcast::<R>()
.map(|x| *x)
.map_err(|b| {
todo!("make this bullshit properly unboxable ehh...")
})
//Box::<(dyn std::any::Any + 'static)>::downcast(Box::new(self)).map_ok(|ok| *ok)
}
}
#[derive(Debug)]
pub struct ParseError(DynRange<()>, Option<Box<dyn error::Error+'static>>);
impl ParseError
{
fn new<R: Into<DynRange<()>>>(which: R, err: impl error::Error + 'static) -> Self
{
Self(which.into(), Some(Box::new(err)))
}
fn none(which: impl Into<DynRange<()>>) -> Self
{
Self(which.into(), None)
}
fn map<T: Into<DynRange<()>>>(self, to: T) -> Self
{
Self (to.into(), self.1)
}
}
impl error::Error for ParseError
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
if let Some(this) = self.1.as_ref() {
Some(this.as_ref())
} else {
None
}
}
}
impl fmt::Display for ParseError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to parse range in format `{:?}`", self.0)?;
if let Some(this) = self.1.as_ref() {
write!(f, ": {}", this)?;
}
Ok(())
}
}
impl<T: FromStr> FromStr for DynRange<T>
where T::Err: error::Error + 'static
{
type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s== ".." {
Ok(Self::Full(..))
} else if s.starts_with("..=") {
Ok(Self::ToInclusive(..=T::from_str(&s[3..]).map_err(|x| ParseError::new(..=(), x))?))
} else if s.starts_with("..") {
Ok(Self::To(..(T::from_str(&s[2..])).map_err(|x| ParseError::new(..(), x))?))
} else if s.ends_with("..") {
Ok(Self::From(T::from_str(&s[..s.len()-2]).map_err(|x| ParseError::new(().., x))?..))
} else {
fn try_next_incl<'a, T: FromStr>(m: &mut impl Iterator<Item=&'a str>) -> Result<RangeInclusive<T>, ParseError>
where T::Err: error::Error + 'static
{
let (first, second) = if let Some(first) = m.next() {
if let Some(seocond) = m.next() {
(first,seocond)
} else {
return Err(ParseError::none(()..=()));
}
} else {
return Err(ParseError::none(()..=()));
};
let first: T = first.parse().map_err(|x| ParseError::new(()..=(), x))?;
let second: T = second.parse().map_err(|x| ParseError::new(()..=(), x))?;
Ok(first..=second)
}
fn try_next<'a, T: FromStr>(m: &mut impl Iterator<Item=&'a str>) -> Result<Range<T>, ParseError>
where T::Err: error::Error + 'static
{
let (first, second) = if let Some(first) = m.next() {
if let Some(seocond) = m.next() {
(first,seocond)
} else {
return Err(ParseError::none(()..()));
}
} else {
return Err(ParseError::none(()..()));
};
let first: T = first.parse().map_err(|x| ParseError::new(()..(), x))?;
let second: T = second.parse().map_err(|x| ParseError::new(()..(), x))?;
Ok(first..second)
}
let mut split = s.split("..=").fuse();
let mut last_err = ParseError::none(()..());
match loop {
match try_next_incl(&mut split) {
Err(ParseError(_, None)) => break Err(last_err), // iter empty
Err(other) => last_err = other,
Ok(value) => break Ok(Self::Inclusive(value)),
}
} {
Ok(v) => return Ok(v),
Err(e) => last_err = e,
};
let mut split = s.split("..").fuse();
match loop {
match try_next(&mut split) {
Err(ParseError(_, None)) => break Err(last_err), // iter empty
Err(other) => last_err = other,
Ok(value) => break Ok(Self::Range(value)),
}
} {
Ok(v) => Ok(v),
Err(e) => Err(e),
}
}
}
}

@ -272,6 +272,6 @@ mod tests
let string = "abcdef ghi jk1\nhian";
assert_eq!(filter.filter_str(&string).to_string(), filter.filter_cow(&string).to_string());
assert_eq!(filter.filter_cow(&string).to_string(), filter.filter(string.chars()).collect::<String>());
assert_eq!(filter.filter_cow(&string).to_string(), filter.filter_iter(string.chars()).collect::<String>());
}
}

@ -13,7 +13,7 @@ use std::{
#[derive(Debug)]
pub struct SentanceError;
/// A sentance
/// A sentence
#[derive(Debug, PartialEq, Eq)]
#[repr(transparent)]
pub struct Sentance(str);
@ -25,7 +25,7 @@ macro_rules! new {
};
}
const DEFAULT_BOUNDARIES: &[char] = &['\n', '.', ':', '!', '?'];
const DEFAULT_BOUNDARIES: &[char] = &['\n', '.', ':', '!', '?', '~'];
lazy_static! {
static ref BOUNDARIES: smallmap::Map<char, ()> = {

@ -25,7 +25,7 @@ macro_rules! new {
};
}
const DEFAULT_BOUNDARIES: &[char] = &['!', '.', ','];
const DEFAULT_BOUNDARIES: &[char] = &['!', '.', ',', '*'];
lazy_static! {
static ref BOUNDARIES: smallmap::Map<char, ()> = {
@ -68,7 +68,8 @@ impl Word
}
/// Create a new iterator over words from this sentance.
pub fn new_iter<'a>(from: &'a (impl AsRef<Sentance> +?Sized+'a)) -> impl Iterator<Item = &'a Self>
pub fn new_iter<'a, 'b>(from: &'a (impl AsRef<Sentance> +?Sized+'b)) -> impl Iterator<Item = &'a Self>
where 'b: 'a
{
let from = from.as_ref();
from.split_inclusive(is_word_boundary)
@ -139,3 +140,11 @@ impl AsRef<Word> for Word
self
}
}
pub fn words(input: &str) -> impl Iterator<Item=&'_ Word>
{
input.split_inclusive(is_word_boundary)
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.map(|x| new!(x))
}

@ -43,7 +43,7 @@ type Decompressor<T> = BzDecoder<T>;
pub async fn save_now(state: &State) -> io::Result<()>
{
let chain = state.chain().read().await;
let chain = state.chain_ref().read().await;
use std::ops::Deref;
let to = &state.config().file;
save_now_to(chain.deref(),to).await
@ -73,31 +73,38 @@ async fn save_now_to(chain: &Chain<String>, to: impl AsRef<Path>) -> io::Result<
}
/// Start the save loop for this chain
pub async fn host(mut state: State)
pub async fn host(mut state: Box<State>)
{
let to = state.config().file.to_owned();
let interval = state.config().save_interval();
while Arc::strong_count(state.when()) > 1 {
{
let chain = state.chain().read().await;
use std::ops::Deref;
if let Err(e) = save_now_to(chain.deref(), &to).await {
error!("Failed to save chain: {}", e);
} else {
info!("Saved chain to {:?}", to);
let when = Arc::clone(state.when_ref());
trace!("Setup oke. Waiting on init");
if state.on_init().await.is_ok() {
debug!("Begin save handler");
while Arc::strong_count(&when) > 1 {
{
let chain = state.chain_ref().read().await;
use std::ops::Deref;
if let Err(e) = save_now_to(chain.deref(), &to).await {
error!("Failed to save chain: {}", e);
} else {
info!("Saved chain to {:?}", to);
}
}
}
tokio::select!{
_ = OptionFuture::from(interval.map(|interval| time::delay_for(interval))) => {},
_ = state.on_shutdown() => {
tokio::select!{
_ = OptionFuture::from(interval.map(|interval| time::delay_for(interval))) => {},
_ = state.on_shutdown() => {
break;
}
}
when.notified().await;
if state.has_shutdown() {
break;
}
}
state.when().notified().await;
if state.has_shutdown() {
break;
}
} else {
debug!("Shutdown called before init");
}
trace!("Saver exiting");
}

@ -1,17 +1,19 @@
//! /sentance/
use super::*;
use futures::StreamExt;
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), gen::GenBodyError>
{
let string = {
let chain = state.chain().read().await;
if chain.is_empty() {
return Ok(());
}
let mut chain = state.chain_read();
match num {
None => chain.generate_str(),
Some(num) => (0..num).map(|_| chain.generate_str()).join("\n"),
None => chain.next().await.ok_or_else(gen::GenBodyError::default)?,
Some(num) if num < state.config().max_gen_size => {//(0..num).map(|_| chain.generate_str()).join("\n"),
let chain = chain.take(num);
chain.collect::<Vec<_>>().await.join("\n")//TODO: Stream version of JoinStrExt
},
_ => return Err(Default::default()),
}
};
@ -20,14 +22,14 @@ pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<Str
if let Some(num) = num {
for sen in sanitise::Sentance::new_iter(&string).take(num)
{
output.send(filter.filter_owned(sen.to_owned())).await.map_err(|e| gen::GenBodyError(e.0))?;
output.send(filter.filter_owned(sen.to_owned())).await?;
}
} else {
output.send(filter.filter_owned(match sanitise::Sentance::new_iter(&string)
.max_by_key(|x| x.len()) {
Some(x) => x,
#[cold] None => return Ok(()),
}.to_owned())).await.map_err(|e| gen::GenBodyError(e.0))?;
/*#[cold]*/ None => return Ok(()),
}.to_owned())).await?;
}
Ok(())
}

@ -12,48 +12,60 @@ use tokio::{
pub async fn handle(mut state: State)
{
let mut usr1 = unix::signal(SignalKind::user_defined1()).expect("Failed to hook SIGUSR1");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT");
let mut io = unix::signal(SignalKind::io()).expect("Failed to hook IO");
loop {
tokio::select! {
_ = state.on_shutdown() => {
break;
}
_ = usr1.recv() => {
info!("Got SIGUSR1. Saving chain immediately.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
},
_ = usr2.recv() => {
info!("Got SIGUSR1. Loading chain immediately.");
match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain().write().await;
*chain = new;
}
trace!("Replaced with read chain");
},
Err(e) => {
error!("Failed to load chain from file, keeping current: {}", e);
},
trace!("Setup oke. Waiting on init");
if state.on_init().await.is_ok() {
debug!("Begin signal handler");
loop {
tokio::select! {
_ = state.on_shutdown() => {
break;
}
},
_ = quit.recv() => {
warn!("Got SIGQUIT. Saving chain then aborting.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay.");
}
error!("Aborting");
std::process::abort()
},
_ = usr1.recv() => {
info!("Got SIGUSR1. Causing chain write.");
state.push_now();
},
_ = usr2.recv() => {
info!("Got SIGUSR2. Loading chain immediately.");
match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain_ref().write().await;
*chain = new;
}
trace!("Replaced with read chain");
},
Err(e) => {
error!("Failed to load chain from file, keeping current: {}", e);
},
}
},
_ = io.recv() => {
info!("Got SIGIO. Saving chain immediately.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
},
_ = quit.recv() => {
warn!("Got SIGQUIT. Saving chain then aborting.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay.");
}
error!("Aborting");
std::process::abort()
},
}
}
} else {
debug!("Shutdown called before init()");
}
trace!("Graceful shutdown");
trace!("Exiting");
}

@ -3,17 +3,32 @@ use super::*;
use tokio::{
sync::{
watch,
mpsc::error::SendError,
},
};
use config::Config;
use msg::Initialiser;
#[derive(Debug)]
pub struct ShutdownError;
impl error::Error for ShutdownError{}
impl fmt::Display for ShutdownError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "shutdown signal caught")
}
}
#[derive(Debug, Clone)]
pub struct State
{
config: Arc<Config>, //to avoid cloning config
exclude: Arc<(sanitise::filter::Filter, sanitise::filter::Filter)>,
chain: Arc<RwLock<Chain<String>>>,
save: Arc<Notify>,
config: Arc<Box<(Config, config::Cache)>>, //to avoid cloning config
chain: handle::ChainHandle<String>,
//save: Arc<Notify>,
begin: Initialiser,
shutdown: Arc<watch::Sender<bool>>,
shutdown_recv: watch::Receiver<bool>,
@ -21,24 +36,56 @@ pub struct State
impl State
{
/// Consume this `state` into its initialiser
pub fn into_initialiser(self) -> Initialiser
{
self.begin
}
/// Allow the saver task to start work
pub fn init(self) -> Result<(), msg::InitError>
{
self.begin.set()
}
/// Has `init` been called?
pub fn is_init(&self) -> bool
{
self.begin.is_set()
}
/// A future that completes either when `init` is called, or `shutdown`.
pub async fn on_init(&mut self) -> Result<(), ShutdownError>
{
if self.has_shutdown() {
return Err(ShutdownError);
}
tokio::select! {
Ok(()) = self.begin.clone_into_wait() => Ok(()),
_ = self.on_shutdown() => {
debug!("on_init(): shutdown received");
Err(ShutdownError)
}
else => Err(ShutdownError)
}
}
pub fn inbound_filter(&self) -> &sanitise::filter::Filter
{
&self.exclude.0
&self.config_cache().inbound_filter
}
pub fn outbound_filter(&self) -> &sanitise::filter::Filter
pub fn outbound_filter(&self) -> &sanitise::filter::Filter
{
&self.exclude.1
&self.config_cache().outbound_filter
}
pub fn new(config: Config, chain: Arc<RwLock<Chain<String>>>, save: Arc<Notify>) -> Self
pub fn new(config: Config, cache: config::Cache, chain: handle::ChainHandle<String>) -> Self
{
let (shutdown, shutdown_recv) = watch::channel(false);
Self {
exclude: Arc::new((config.filter.get_inbound_filter(),
config.filter.get_outbound_filter())),
config: Arc::new(config),
config: Arc::new(Box::new((config, cache))),
chain,
save,
begin: Initialiser::new(),
shutdown: Arc::new(shutdown),
shutdown_recv,
}
@ -46,28 +93,56 @@ impl State
pub fn config(&self) -> &Config
{
self.config.as_ref()
&self.config.as_ref().0
}
pub fn notify_save(&self)
pub fn config_cache(&self) -> &config::Cache
{
self.save.notify();
&self.config.as_ref().1
}
pub fn chain(&self) -> &RwLock<Chain<String>>
/*pub fn notify_save(&self)
{
self.save.notify();
}*/
/*pub fn chain(&self) -> &RwLock<Chain<String>>
{
&self.chain.as_ref()
}*/
pub fn chain_ref(&self) -> &RwLock<Chain<String>>
{
&self.chain.chain_ref()
}
pub fn chain_read(&self) -> handle::ChainStream<String>
{
self.chain.read()
}
/// Write to this chain
pub async fn chain_write<'a, T: Stream<Item = String>>(&'a self, buffer: T) -> Result<(), SendError<Vec<String>>>
{
self.chain.write_stream(buffer).await
}
pub fn when_ref(&self) -> &Arc<Notify>
{
&self.chain.as_ref()
&self.chain.notify_when()
}
pub fn when(&self) -> &Arc<Notify>
/// Force the chain to push through now
pub fn push_now(&self)
{
&self.save
self.chain.push_now()
}
pub fn shutdown(self)
{
self.shutdown.broadcast(true).expect("Failed to communicate shutdown");
self.save.notify();
self.chain.hang();
self.when_ref().notify();
}
pub fn has_shutdown(&self) -> bool

@ -39,3 +39,54 @@ pub fn hint_cap<T: NewCapacity, I: Iterator>(iter: &I) -> T
(_, Some(x)) | (x, _) => T::with_capacity(x)
}
}
#[macro_export] macro_rules! opaque_error {
($msg:literal) => {
{
#[derive(Debug)]
struct OpaqueError;
impl ::std::error::Error for OpaqueError{}
impl ::std::fmt::Display for OpaqueError
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result
{
write!(f, $msg)
}
}
OpaqueError
}
};
($msg:literal $($tt:tt)*) => {
{
#[derive(Debug)]
struct OpaqueError(String);
impl ::std::error::Error for OpaqueError{}
impl ::std::fmt::Display for OpaqueError
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result
{
write!(f, "{}", self.0)
}
}
OpaqueError(format!($msg $($tt)*))
}
};
(yield $msg:literal $($tt:tt)*) => {
{
#[derive(Debug)]
struct OpaqueError<'a>(fmt::Arguments<'a>);
impl ::std::error::Error for OpaqueError{}
impl ::std::fmt::Display for OpaqueError
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result
{
write!(f, "{}", self.0)
}
}
OpaqueError(format_args!($msg $($tt)*))
}
};
}

Loading…
Cancel
Save