Compare commits

...

11 Commits

Author SHA1 Message Date
Avril 3267151615
Removed un-checked #[cold] from match arm branches.
2 years ago
Avril bfaff6067a
added connection acceptance filters
4 years ago
Avril 5c784482f6
ref lt
4 years ago
Avril bdf61340db
Packaging version 0.9.0
4 years ago
Avril 7635ff9d52
update Makefile
4 years ago
Avril acf2ac605e
Merge branch 'feed' into master
4 years ago
Avril d1251a592c
Merge branch 'master' of git.flanchan.moe:flanchan/genmarkov into master
4 years ago
Avril 75730cbe0f
working implementation of handler
4 years ago
Avril 5dc10547d5
handle okay
4 years ago
Avril 6453392758
remove direct dependancy on libc
4 years ago
Avril a6fc26a053
update ebuild
4 years ago

813
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -1,6 +1,6 @@
[package]
name = "markov"
version = "0.8.1"
version = "0.9.1"
description = "Generate string of text from Markov chain fed by stdin"
authors = ["Avril <flanchan@cumallover.me>"]
edition = "2018"
@ -36,14 +36,7 @@ split-sentance = []
# NOTE: This does nothing if `split-newlines` is not enabled
always-aggregate = []
# Feeds will hog the buffer lock until the whole body has been fed, instead of acquiring lock every time
# This will make feeds of many lines faster but can potentially cause DoS
#
# With: ~169ms
# Without: ~195ms
#
# NOTE:
# This does nothing if `always-aggregate` is enabled and/or `split-newlines` is not enabled
# Does nothing on versions 9.0+
hog-buffer = []
# Enable the /api/ route
@ -56,6 +49,7 @@ instant-init = []
opt-level = 3
lto = "fat"
codegen-units = 1
strip=true
[dependencies]
chain = {package = "markov", version = "1.1.0"}
@ -71,11 +65,11 @@ serde = {version ="1.0", features=["derive"]}
toml = "0.5.6"
async-compression = {version = "0.3.5", features=["tokio-02", "bzip2"], optional=true}
pin-project = "0.4"
libc = "0.2.79"
smallmap = "1.1.5"
lazy_static = "1.4.0"
once_cell = "1.4.1"
bzip2-sys = {version = "0.1.9", optional = true}
cidr = {version = "0.1.1", features = ["serde"]}
[build-dependencies]
rustc_version = "0.2"

@ -1,4 +1,4 @@
FEATURES:="api,always-aggregate,split-sentance"
FEATURES:="api,split-sentance"
VERSION:=`cargo read-manifest | rematch - 'version":"([0-9\.]+)"' 1`
markov:

@ -0,0 +1,217 @@
# Copyright 2017-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
# Auto-Generated by cargo-ebuild 0.3.1
EAPI=7
CRATES="
aho-corasick-0.7.13
arc-swap-0.4.7
async-compression-0.3.5
atty-0.2.14
autocfg-0.1.7
autocfg-1.0.1
base64-0.12.3
bitflags-1.2.1
block-buffer-0.7.3
block-buffer-0.9.0
block-padding-0.1.5
buf_redux-0.8.4
byte-tools-0.3.1
byteorder-1.3.4
bytes-0.5.6
bzip2-0.3.3
bzip2-sys-0.1.9+1.0.8
cc-1.0.60
cfg-if-0.1.10
cfg-if-1.0.0
cloudabi-0.0.3
cpuid-bool-0.1.2
digest-0.8.1
digest-0.9.0
dtoa-0.4.6
either-1.6.1
env_logger-0.7.1
fake-simd-0.1.2
fixedbitset-0.2.0
fnv-1.0.7
fuchsia-cprng-0.1.1
fuchsia-zircon-0.3.3
fuchsia-zircon-sys-0.3.3
futures-0.3.6
futures-channel-0.3.6
futures-core-0.3.6
futures-executor-0.3.6
futures-io-0.3.6
futures-macro-0.3.6
futures-sink-0.3.6
futures-task-0.3.6
futures-util-0.3.6
generic-array-0.12.3
generic-array-0.14.4
getopts-0.2.21
getrandom-0.1.15
h2-0.2.6
half-1.6.0
hashbrown-0.9.1
headers-0.3.2
headers-core-0.2.0
hermit-abi-0.1.17
http-0.2.1
http-body-0.3.1
httparse-1.3.4
httpdate-0.3.2
humantime-1.3.0
hyper-0.13.8
idna-0.2.0
indexmap-1.6.0
input_buffer-0.3.1
iovec-0.1.4
itertools-0.9.0
itoa-0.4.6
kernel32-sys-0.2.2
lazy_static-1.4.0
libc-0.2.79
linked-hash-map-0.5.3
log-0.4.11
markov-1.1.0
matches-0.1.8
memchr-2.3.3
mime-0.3.16
mime_guess-2.0.3
mio-0.6.22
mio-named-pipes-0.1.7
mio-uds-0.6.8
miow-0.2.1
miow-0.3.5
multipart-0.17.0
net2-0.2.35
num_cpus-1.13.0
once_cell-1.4.1
opaque-debug-0.2.3
opaque-debug-0.3.0
percent-encoding-2.1.0
petgraph-0.5.1
pin-project-0.4.26
pin-project-internal-0.4.26
pin-project-lite-0.1.10
pin-utils-0.1.0
pkg-config-0.3.18
ppv-lite86-0.2.9
pretty_env_logger-0.4.0
proc-macro-hack-0.5.18
proc-macro-nested-0.1.6
proc-macro2-1.0.24
quick-error-1.2.3
quote-1.0.7
rand-0.6.5
rand-0.7.3
rand_chacha-0.1.1
rand_chacha-0.2.2
rand_core-0.3.1
rand_core-0.4.2
rand_core-0.5.1
rand_hc-0.1.0
rand_hc-0.2.0
rand_isaac-0.1.1
rand_jitter-0.1.4
rand_os-0.1.3
rand_pcg-0.1.2
rand_xorshift-0.1.1
rdrand-0.4.0
redox_syscall-0.1.57
regex-1.3.9
regex-syntax-0.6.18
remove_dir_all-0.5.3
rustc_version-0.2.3
ryu-1.0.5
safemem-0.3.3
scoped-tls-1.0.0
semver-0.9.0
semver-parser-0.7.0
serde-1.0.116
serde_cbor-0.11.1
serde_derive-1.0.116
serde_json-1.0.58
serde_urlencoded-0.6.1
serde_yaml-0.8.13
sha-1-0.8.2
sha-1-0.9.1
signal-hook-registry-1.2.1
slab-0.4.2
smallmap-1.1.5
socket2-0.3.15
syn-1.0.42
tempfile-3.1.0
termcolor-1.1.0
thread_local-1.0.1
time-0.1.44
tinyvec-0.3.4
tokio-0.2.22
tokio-macros-0.2.5
tokio-tungstenite-0.11.0
tokio-util-0.3.1
toml-0.5.6
tower-service-0.3.0
tracing-0.1.21
tracing-core-0.1.17
tracing-futures-0.2.4
try-lock-0.2.3
tungstenite-0.11.1
twoway-0.1.8
typenum-1.12.0
unicase-2.6.0
unicode-bidi-0.3.4
unicode-normalization-0.1.13
unicode-width-0.1.8
unicode-xid-0.2.1
url-2.1.1
urlencoding-1.1.1
utf-8-0.7.5
version_check-0.9.2
want-0.3.0
warp-0.2.5
wasi-0.10.0+wasi-snapshot-preview1
wasi-0.9.0+wasi-snapshot-preview1
winapi-0.2.8
winapi-0.3.9
winapi-build-0.1.1
winapi-i686-pc-windows-gnu-0.4.0
winapi-util-0.1.5
winapi-x86_64-pc-windows-gnu-0.4.0
ws2_32-sys-0.2.1
yaml-rust-0.4.4
"
inherit cargo
DESCRIPTION="Generate string of text from Markov chain fed by stdin"
# Double check the homepage as the cargo_metadata crate
# does not provide this value so instead repository is used
HOMEPAGE="https://flanchan.moe/markov/"
SRC_URI="$(cargo_crate_uris ${CRATES}) https://git.flanchan.moe/attachments/c6f37bfc-afd8-462f-807f-ab9f95197680 -> markov-0.8.1.crate"
RESTRICT="mirror"
# License set may be more restrictive as OR is not respected
# use cargo-license for a more accurate license picture
LICENSE="GPL-3+"
SLOT="0"
KEYWORDS="~amd64"
IUSE="+compress-chain +split-newlines +api split-sentance feed-sentance always-aggregate hog-buffer"
DEPEND="compress-chain? ( app-arch/bzip2 )"
RDEPEND=""
src_configure() {
local myfeatures=(
$(usev compress-chain)
$(usev split-newlines)
$(usev api)
$(usev split-sentance)
$(usev feed-sentance)
$(usev always-aggregate)
$(usev hog-buffer)
)
#TODO: This hack slows compilation down I think, but without it ld fails so... We should add cargo buildscript to do this instead
use compress-chain && export RUSTFLAGS="${RUSTFLAGS} -ldylib=bz2"
cargo_src_configure --no-default-features
}

@ -8,7 +8,8 @@
<flag name="compress-chain">Compress chain when saving/loading</flag>
<flag name="split-newlines">Treat each new line as a new set to feed</flag>
<flag name="api">Enable /api route</flag>
<flag name="split-sentance">Use sentance split by default (not recommended)</flag>
<flag name="feed-sentance">Further split buffers by sentance, feeding a new one for each.</flag>
<flag name="split-sentance">Split by sentance as well as word boundaries</flag>
<flag name="always-aggregate">Always operate on aggregated request body (can speed up writes at the cost of memory)</flag>
<flag name="hog-buffer">Acquire chain mutex write lock while streaming body (can speed up writes, but can also allow for DoS)</flag></use>
</pkgmetadata>

@ -0,0 +1,203 @@
# Copyright 2017-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
# Auto-Generated by cargo-ebuild 0.3.1
EAPI=7
CRATES="
aho-corasick-0.7.13
arc-swap-0.4.7
async-compression-0.3.5
atty-0.2.14
autocfg-0.1.7
autocfg-1.0.1
base64-0.12.3
bitflags-1.2.1
block-buffer-0.7.3
block-buffer-0.9.0
block-padding-0.1.5
buf_redux-0.8.4
byte-tools-0.3.1
byteorder-1.3.4
bytes-0.5.6
bzip2-0.3.3
bzip2-sys-0.1.9+1.0.8
cc-1.0.60
cfg-if-0.1.10
cfg-if-1.0.0
cloudabi-0.0.3
cpuid-bool-0.1.2
digest-0.8.1
digest-0.9.0
dtoa-0.4.6
either-1.6.1
env_logger-0.7.1
fake-simd-0.1.2
fixedbitset-0.2.0
fnv-1.0.7
fuchsia-cprng-0.1.1
fuchsia-zircon-0.3.3
fuchsia-zircon-sys-0.3.3
futures-0.3.6
futures-channel-0.3.6
futures-core-0.3.6
futures-executor-0.3.6
futures-io-0.3.6
futures-macro-0.3.6
futures-sink-0.3.6
futures-task-0.3.6
futures-util-0.3.6
generic-array-0.12.3
generic-array-0.14.4
getopts-0.2.21
getrandom-0.1.15
h2-0.2.6
half-1.6.0
hashbrown-0.9.1
headers-0.3.2
headers-core-0.2.0
hermit-abi-0.1.17
http-0.2.1
http-body-0.3.1
httparse-1.3.4
httpdate-0.3.2
humantime-1.3.0
hyper-0.13.8
idna-0.2.0
indexmap-1.6.0
input_buffer-0.3.1
iovec-0.1.4
itertools-0.9.0
itoa-0.4.6
kernel32-sys-0.2.2
lazy_static-1.4.0
libc-0.2.79
linked-hash-map-0.5.3
log-0.4.11
markov-1.1.0
matches-0.1.8
memchr-2.3.3
mime-0.3.16
mime_guess-2.0.3
mio-0.6.22
mio-named-pipes-0.1.7
mio-uds-0.6.8
miow-0.2.1
miow-0.3.5
multipart-0.17.0
net2-0.2.35
num_cpus-1.13.0
once_cell-1.4.1
opaque-debug-0.2.3
opaque-debug-0.3.0
percent-encoding-2.1.0
petgraph-0.5.1
pin-project-0.4.26
pin-project-internal-0.4.26
pin-project-lite-0.1.10
pin-utils-0.1.0
pkg-config-0.3.18
ppv-lite86-0.2.9
pretty_env_logger-0.4.0
proc-macro-hack-0.5.18
proc-macro-nested-0.1.6
proc-macro2-1.0.24
quick-error-1.2.3
quote-1.0.7
rand-0.6.5
rand-0.7.3
rand_chacha-0.1.1
rand_chacha-0.2.2
rand_core-0.3.1
rand_core-0.4.2
rand_core-0.5.1
rand_hc-0.1.0
rand_hc-0.2.0
rand_isaac-0.1.1
rand_jitter-0.1.4
rand_os-0.1.3
rand_pcg-0.1.2
rand_xorshift-0.1.1
rdrand-0.4.0
redox_syscall-0.1.57
regex-1.3.9
regex-syntax-0.6.18
remove_dir_all-0.5.3
rustc_version-0.2.3
ryu-1.0.5
safemem-0.3.3
scoped-tls-1.0.0
semver-0.9.0
semver-parser-0.7.0
serde-1.0.116
serde_cbor-0.11.1
serde_derive-1.0.116
serde_json-1.0.58
serde_urlencoded-0.6.1
serde_yaml-0.8.13
sha-1-0.8.2
sha-1-0.9.1
signal-hook-registry-1.2.1
slab-0.4.2
smallmap-1.1.5
socket2-0.3.15
syn-1.0.42
tempfile-3.1.0
termcolor-1.1.0
thread_local-1.0.1
time-0.1.44
tinyvec-0.3.4
tokio-0.2.22
tokio-macros-0.2.5
tokio-tungstenite-0.11.0
tokio-util-0.3.1
toml-0.5.6
tower-service-0.3.0
tracing-0.1.21
tracing-core-0.1.17
tracing-futures-0.2.4
try-lock-0.2.3
tungstenite-0.11.1
twoway-0.1.8
typenum-1.12.0
unicase-2.6.0
unicode-bidi-0.3.4
unicode-normalization-0.1.13
unicode-width-0.1.8
unicode-xid-0.2.1
url-2.1.1
urlencoding-1.1.1
utf-8-0.7.5
version_check-0.9.2
want-0.3.0
warp-0.2.5
wasi-0.10.0+wasi-snapshot-preview1
wasi-0.9.0+wasi-snapshot-preview1
winapi-0.2.8
winapi-0.3.9
winapi-build-0.1.1
winapi-i686-pc-windows-gnu-0.4.0
winapi-util-0.1.5
winapi-x86_64-pc-windows-gnu-0.4.0
ws2_32-sys-0.2.1
yaml-rust-0.4.4
"
inherit cargo
DESCRIPTION="Generate string of text from Markov chain fed by stdin"
# Double check the homepage as the cargo_metadata crate
# does not provide this value so instead repository is used
HOMEPAGE="homepage field in Cargo.toml inaccessible to cargo metadata"
SRC_URI="$(cargo_crate_uris ${CRATES})"
RESTRICT="mirror"
# License set may be more restrictive as OR is not respected
# use cargo-license for a more accurate license picture
LICENSE="Apache-2.0 Apache-2.0 WITH LLVM-exception BSD-2-Clause BSD-3-Clause BSL-1.0 CC0-1.0 ISC MIT Unlicense Zlib gpl-3.0-or-later"
SLOT="0"
KEYWORDS="~amd64"
IUSE=""
DEPEND=""
RDEPEND=""

@ -7,5 +7,13 @@ trust_x_forwarded_for = false
feed_bounds = '2..'
[filter]
inbound = '<>/\\'
inbound = ''
outbound = ''
[writer]
backlog = 32
internal_backlog = 8
capacity = 4
[mask]
default = 'Accept'

@ -165,7 +165,7 @@ impl BindError<std::convert::Infallible>
match self {
Self::Warp(w) => BindError::Warp(w),
Self::IO(w) => BindError::IO(w),
#[cold] _ => unreachable!(),
/*#[cold]*/ _ => unreachable!(),
}
}
}

@ -1,6 +1,5 @@
use libc::{
c_void,
};
use std::ptr;
/// Copy slice of bytes only
///
/// # Notes
@ -9,7 +8,8 @@ pub fn copy_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memcpy(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
//libc::memcpy(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
ptr::copy_nonoverlapping(&src[0] as *const u8, &mut dst[0] as *mut u8, sz);
}
sz
}
@ -22,7 +22,8 @@ pub fn move_slice(dst: &mut [u8], src: &[u8]) -> usize
{
let sz = std::cmp::min(dst.len(),src.len());
unsafe {
libc::memmove(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
//libc::memmove(&mut dst[0] as *mut u8 as *mut c_void, &src[0] as *const u8 as *const c_void, sz);
ptr::copy(&src[0] as *const u8, &mut dst[0] as *mut u8, sz);
}
sz
}

@ -6,6 +6,7 @@ use std::{
Context,
},
pin::Pin,
marker::PhantomData,
};
use tokio::{
io::{
@ -173,3 +174,109 @@ mod tests
assert_eq!(&output[..], "Hello world\nHow are you");
}
}
/// A stream that chunks its input.
#[pin_project]
pub struct ChunkingStream<S, T, Into=Vec<T>>
{
#[pin] stream: Fuse<S>,
buf: Vec<T>,
cap: usize,
_output: PhantomData<Into>,
push_now: bool,
}
impl<S, T, Into> ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
pub fn new(stream: S, sz: usize) -> Self
{
Self {
stream: stream.fuse(),
buf: Vec::with_capacity(sz),
cap: sz,
_output: PhantomData,
push_now: false,
}
}
pub fn into_inner(self) -> S
{
self.stream.into_inner()
}
pub fn cap(&self) -> usize
{
self.cap
}
pub fn buffer(&self) -> &[T]
{
&self.buf[..]
}
pub fn get_ref(&self) -> &S
{
self.stream.get_ref()
}
pub fn get_mut(&mut self)-> &mut S
{
self.stream.get_mut()
}
/// Force the next read to send the buffer even if it's not full.
///
/// # Note
/// The buffer still won't send if it's empty.
pub fn push_now(&mut self)
{
self.push_now= true;
}
/// Consume into the current held buffer
pub fn into_buffer(self) -> Vec<T>
{
self.buf
}
/// Take the buffer now
pub fn take_now(&mut self) -> Into
{
std::mem::replace(&mut self.buf, Vec::with_capacity(self.cap)).into()
}
}
impl<S, T, Into> Stream for ChunkingStream<S,T, Into>
where S: Stream<Item=T>,
Into: From<Vec<T>>
{
type Item = Into;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while !(self.push_now && !self.buf.is_empty()) && self.buf.len() < self.cap {
// Buffer isn't full, keep filling
let this = self.as_mut().project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => {
// Stream is over
break;
},
Poll::Ready(Some(item)) => {
this.buf.push(item);
},
_ => return Poll::Pending,
}
}
debug!("Sending buffer of {} (cap {})", self.buf.len(), self.cap);
// Buffer is full or we reach end of stream
Poll::Ready(if self.buf.len() == 0 {
None
} else {
let this = self.project();
*this.push_now = false;
let output = std::mem::replace(this.buf, Vec::with_capacity(*this.cap));
Some(output.into())
})
}
}

@ -15,6 +15,7 @@ use tokio::{
time::Duration,
io::BufReader,
};
use ipfilt::IpFilter;
pub const DEFAULT_FILE_LOCATION: &'static str = "markov.toml";
@ -28,19 +29,67 @@ pub struct Config
pub save_interval_secs: Option<NonZeroU64>,
pub trust_x_forwarded_for: bool,
#[serde(default)]
pub feed_bounds: String,
#[serde(default)]
pub filter: FilterConfig,
#[serde(default)]
pub feed_bounds: String,
pub writer: WriterConfig,
#[serde(default)]
pub mask: IpFilter,
}
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct FilterConfig
{
#[serde(default)]
inbound: String,
#[serde(default)]
outbound: String,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash, Serialize, Deserialize)]
pub struct WriterConfig
{
pub backlog: usize,
pub internal_backlog: usize,
pub capacity: usize,
pub timeout_ms: Option<u64>,
pub throttle_ms: Option<u64>,
}
impl Default for WriterConfig
{
#[inline]
fn default() -> Self
{
Self {
backlog: 32,
internal_backlog: 8,
capacity: 4,
timeout_ms: None,
throttle_ms: None,
}
}
}
impl WriterConfig
{
fn create_settings(self, bounds: range::DynRange<usize>) -> handle::Settings
{
handle::Settings{
backlog: self.backlog,
internal_backlog: self.internal_backlog,
capacity: self.capacity,
timeout: self.timeout_ms.map(tokio::time::Duration::from_millis).unwrap_or(handle::DEFAULT_TIMEOUT),
throttle: self.throttle_ms.map(tokio::time::Duration::from_millis),
bounds,
}
}
}
impl FilterConfig
{
fn get_inbound_filter(&self) -> sanitise::filter::Filter
@ -77,6 +126,8 @@ impl Default for Config
trust_x_forwarded_for: false,
filter: Default::default(),
feed_bounds: "2..".to_owned(),
writer: Default::default(),
mask: Default::default(),
}
}
}
@ -95,13 +146,15 @@ impl Config
}
}
use std::ops::RangeBounds;
Ok(Cache {
feed_bounds: section!("feed_bounds", self.parse_feed_bounds()).and_then(|bounds| if bounds.contains(&0) {
let feed_bounds = section!("feed_bounds", self.parse_feed_bounds()).and_then(|bounds| if bounds.contains(&0) {
Err(InvalidConfigError("feed_bounds", Box::new(opaque_error!("Bounds not allowed to contains 0 (they were `{}`)", bounds))))
} else {
Ok(bounds)
})?,
})?;
Ok(Cache {
handler_settings: self.writer.create_settings(feed_bounds.clone()),
feed_bounds,
inbound_filter: self.filter.get_inbound_filter(),
outbound_filter: self.filter.get_outbound_filter(),
})
@ -205,12 +258,13 @@ impl fmt::Display for InvalidConfigError
/// Caches some parsed config arguments
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq)]
pub struct Cache
{
pub feed_bounds: range::DynRange<usize>,
pub inbound_filter: sanitise::filter::Filter,
pub outbound_filter: sanitise::filter::Filter,
pub handler_settings: handle::Settings,
}
impl fmt::Debug for Cache
@ -221,6 +275,7 @@ impl fmt::Debug for Cache
.field("feed_bounds", &self.feed_bounds)
.field("inbound_filter", &self.inbound_filter.iter().collect::<String>())
.field("outbound_filter", &self.outbound_filter.iter().collect::<String>())
.field("handler_settings", &self.handler_settings)
.finish()
}
}

@ -1,4 +1,5 @@
//! Extensions
use super::*;
use std::{
iter,
ops::{
@ -162,3 +163,21 @@ impl<T> DerefMut for AssertNotSend<T>
&mut self.0
}
}
pub trait ChunkStreamExt<T>: Sized
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>;
fn chunk(self, sz: usize) -> chunking::ChunkingStream<Self, T>
{
self.chunk_into(sz)
}
}
impl<S, T> ChunkStreamExt<T> for S
where S: Stream<Item=T>
{
fn chunk_into<I: From<Vec<T>>>(self, sz: usize) -> chunking::ChunkingStream<Self,T,I>
{
chunking::ChunkingStream::new(self, sz)
}
}

@ -2,8 +2,11 @@
use super::*;
#[cfg(any(feature="feed-sentance", feature="split-sentance"))]
use sanitise::Sentance;
#[allow(unused_imports)]
use futures::stream;
pub const DEFAULT_FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..; //TODO: Add to config somehow
pub const DEFAULT_FEED_BOUNDS: std::ops::RangeFrom<usize> = 2..;
/// Feed `what` into `chain`, at least `bounds` tokens.
///
@ -56,7 +59,7 @@ pub fn feed(chain: &mut Chain<String>, what: impl AsRef<str>, bounds: impl std::
}
debug_assert!(!bounds.contains(&0), "Cannot allow 0 size feeds");
if bounds.contains(&map.len()) {
debug!("Feeding chain {} items", map.len());
//debug!("Feeding chain {} items", map.len());
chain.feed(map);
}
else {
@ -73,12 +76,12 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
if_debug! {
let timer = std::time::Instant::now();
}
let bounds = &state.config_cache().feed_bounds;
//let bounds = &state.config_cache().feed_bounds;
macro_rules! feed {
($chain:expr, $buffer:ident) => {
($buffer:expr) => {
{
let buffer = $buffer;
feed($chain, &buffer, bounds)
state.chain_write(buffer).await.map_err(|_| FillBodyError)?;
}
}
}
@ -101,44 +104,42 @@ pub async fn full(who: &IpAddr, state: State, body: impl Unpin + Stream<Item = R
let buffer = std::str::from_utf8(&buffer[..]).map_err(|_| FillBodyError)?;
let buffer = state.inbound_filter().filter_cow(buffer);
info!("{} -> {:?}", who, buffer);
let mut chain = state.chain().write().await;
cfg_if! {
if #[cfg(feature="split-newlines")] {
for buffer in buffer.split('\n').filter(|line| !line.trim().is_empty()) {
feed!(&mut chain, buffer);
}
feed!(stream::iter(buffer.split('\n').filter(|line| !line.trim().is_empty())
.map(|x| x.to_owned())))
} else {
feed!(&mut chain, buffer);
feed!(stream::once(async move{buffer.into_owned()}));
}
}
} else {
use tokio::prelude::*;
let reader = chunking::StreamReader::new(body.filter_map(|x| x.map(|mut x| x.to_bytes()).ok()));
let mut lines = reader.lines();
#[cfg(feature="hog-buffer")]
let mut chain = state.chain().write().await;
while let Some(line) = lines.next_line().await.map_err(|_| FillBodyError)? {
let lines = reader.lines();
feed!(lines.filter_map(|x| x.ok().and_then(|line| {
let line = state.inbound_filter().filter_cow(&line);
let line = line.trim();
if !line.is_empty() {
#[cfg(not(feature="hog-buffer"))]
let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
feed!(&mut chain, line);
//#[cfg(not(feature="hog-buffer"))]
//let mut chain = state.chain().write().await; // Acquire mutex once per line? Is this right?
info!("{} -> {:?}", who, line);
written+=line.len();
Some(line.to_owned())
} else {
None
}
written+=line.len();
}
})));
}
}
if_debug!{
if_debug! {
trace!("Write took {}ms", timer.elapsed().as_millis());
}
state.notify_save();
Ok(written)
}

@ -1,34 +1,46 @@
//! Generating the strings
use super::*;
use tokio::sync::mpsc::error::SendError;
use futures::StreamExt;
#[derive(Debug)]
pub struct GenBodyError(pub String);
#[derive(Debug, Default)]
pub struct GenBodyError(Option<String>);
impl error::Error for GenBodyError{}
impl fmt::Display for GenBodyError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "failed to write {:?} to body", self.0)
if let Some(z) = &self.0 {
write!(f, "failed to write read string {:?} to body", z)
} else {
write!(f, "failed to read string from chain. it might be empty.")
}
}
}
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), GenBodyError>
{
let chain = state.chain().read().await;
if !chain.is_empty() {
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS `full_body` and writes, potentially.
for string in chain.str_iter_for(num) {
output.send(filter.filter_owned(string)).await.map_err(|e| GenBodyError(e.0))?;
}
},
_ => output.send(filter.filter_owned(chain.generate_str())).await.map_err(|e| GenBodyError(e.0))?,
}
let mut chain = state.chain_read();
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
let mut chain = chain.take(num);
while let Some(string) = chain.next().await {
output.send(filter.filter_owned(string)).await?;
}
},
_ => output.send(filter.filter_owned(chain.next().await.ok_or_else(GenBodyError::default)?)).await?,
}
Ok(())
}
impl From<SendError<String>> for GenBodyError
{
#[inline] fn from(from: SendError<String>) -> Self
{
Self(Some(from.0))
}
}

@ -0,0 +1,392 @@
//! Chain handler.
use super::*;
use std::{
marker::Send,
sync::Weak,
num::NonZeroUsize,
task::{Poll, Context,},
pin::Pin,
};
use tokio::{
sync::{
RwLock,
RwLockReadGuard,
mpsc::{
self,
error::SendError,
},
watch,
Notify,
},
task::JoinHandle,
time::{
self,
Duration,
},
};
use futures::StreamExt;
pub const DEFAULT_TIMEOUT: Duration= Duration::from_secs(5);
/// Settings for chain handler
#[derive(Debug, Clone, PartialEq)]
pub struct Settings
{
pub backlog: usize,
pub internal_backlog: usize,
pub capacity: usize,
pub timeout: Duration,
pub throttle: Option<Duration>,
pub bounds: range::DynRange<usize>,
}
impl Settings
{
/// Should we keep this string.
#[inline] fn matches(&self, _s: &str) -> bool
{
true
}
}
impl Default for Settings
{
#[inline]
fn default() -> Self
{
Self {
backlog: 32,
internal_backlog: 8,
capacity: 4,
timeout: Duration::from_secs(5),
throttle: Some(Duration::from_millis(200)),
bounds: feed::DEFAULT_FEED_BOUNDS.into(),
}
}
}
#[derive(Debug)]
struct HostInner<T>
{
input: mpsc::Receiver<Vec<T>>,
shutdown: watch::Receiver<bool>,
}
#[derive(Debug)]
struct Handle<T: Send+ chain::Chainable>
{
chain: RwLock<chain::Chain<T>>,
input: mpsc::Sender<Vec<T>>,
opt: Settings,
notify_write: Arc<Notify>,
push_now: Arc<Notify>,
shutdown: watch::Sender<bool>,
/// Data used only for the worker task.
host: msg::Once<HostInner<T>>,
}
#[derive(Clone, Debug)]
pub struct ChainHandle<T: Send + chain::Chainable>(Arc<Box<Handle<T>>>);
impl<T: Send+ chain::Chainable + 'static> ChainHandle<T>
{
pub fn with_settings(chain: chain::Chain<T>, opt: Settings) -> Self
{
let (shutdown_tx, shutdown) = watch::channel(false);
let (itx, irx) = mpsc::channel(opt.backlog);
Self(Arc::new(Box::new(Handle{
chain: RwLock::new(chain),
input: itx,
opt,
push_now: Arc::new(Notify::new()),
notify_write: Arc::new(Notify::new()),
shutdown: shutdown_tx,
host: msg::Once::new(HostInner{
input: irx,
shutdown,
})
})))
}
/// Acquire the chain read lock
async fn chain(&self) -> RwLockReadGuard<'_, chain::Chain<T>>
{
self.0.chain.read().await
}
/// A reference to the chain
pub fn chain_ref(&self) -> &RwLock<chain::Chain<T>>
{
&self.0.chain
}
/// Create a stream that reads generated values forever.
pub fn read(&self) -> ChainStream<T>
{
ChainStream{
chain: Arc::downgrade(&self.0),
buffer: Vec::with_capacity(self.0.opt.backlog),
}
}
/// Send this buffer to the chain
pub fn write(&self, buf: Vec<T>) -> impl futures::Future<Output = Result<(), SendError<Vec<T>>>> + 'static
{
let mut write = self.0.input.clone();
async move {
write.send(buf).await
}
}
/// Send this stream buffer to the chain
pub fn write_stream<'a, I: Stream<Item=T>>(&self, buf: I) -> impl futures::Future<Output = Result<(), SendError<Vec<T>>>> + 'a
where I: 'a
{
let mut write = self.0.input.clone();
async move {
write.send(buf.collect().await).await
}
}
/// Send this buffer to the chain
pub async fn write_in_place(&self, buf: Vec<T>) -> Result<(), SendError<Vec<T>>>
{
self.0.input.clone().send(buf).await
}
/// A referencer for the notifier
pub fn notify_when(&self) -> &Arc<Notify>
{
&self.0.notify_write
}
/// Force the pending buffers to be written to the chain now
pub fn push_now(&self)
{
self.0.push_now.notify();
}
/// Hang the worker thread, preventing it from taking any more inputs and also flushing it.
///
/// # Panics
/// If there was no worker thread.
pub fn hang(&self)
{
trace!("Communicating hang request");
self.0.shutdown.broadcast(true).expect("Failed to communicate hang");
}
}
impl ChainHandle<String>
{
#[deprecated = "use read() pls"]
pub async fn generate_body(&self, state: &state::State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), SendError<String>>
{
let chain = self.chain().await;
if !chain.is_empty() {
let filter = state.outbound_filter();
match num {
Some(num) if num < state.config().max_gen_size => {
//This could DoS writes, potentially.
for string in chain.str_iter_for(num) {
output.send(filter.filter_owned(string)).await?;
}
},
_ => output.send(filter.filter_owned(chain.generate_str())).await?,
}
}
Ok(())
}
}
/// Host this handle on the current task.
///
/// # Panics
/// If `from` has already been hosted.
pub async fn host(from: ChainHandle<String>)
{
let opt = from.0.opt.clone();
let mut data = from.0.host.unwrap().await;
let (mut tx, mut child) = {
// The `real` input channel.
let from = from.clone();
let opt = opt.clone();
let (tx, rx) = mpsc::channel::<Vec<Vec<_>>>(opt.internal_backlog);
(tx, tokio::spawn(async move {
let mut rx = if let Some(thr) = opt.throttle {
time::throttle(thr, rx).boxed()
} else {
rx.boxed()
};
trace!("child: Begin waiting on parent");
while let Some(item) = rx.next().await {
if item.len() > 0 {
info!("Write lock acq");
let mut lock = from.0.chain.write().await;
for item in item.into_iter()
{
use std::ops::DerefMut;
for item in item.into_iter() {
feed::feed(lock.deref_mut(), item, &from.0.opt.bounds);
}
}
trace!("Signalling write");
from.0.notify_write.notify();
}
}
trace!("child: exiting");
}))
};
trace!("Begin polling on child");
tokio::select!{
v = &mut child => {
match v {
/*#[cold]*/ Ok(_) => {warn!("Child exited before we have? This should probably never happen.")},//Should never happen.
Err(e) => {error!("Child exited abnormally. Aborting: {}", e)}, //Child panic or cancel.
}
},
_ = async move {
let mut rx = data.input.chunk(opt.capacity); //we don't even need this tbh, oh well.
if !data.shutdown.recv().await.unwrap_or(true) { //first shutdown we get for free
while Arc::strong_count(&from.0) > 2 {
if *data.shutdown.borrow() {
break;
}
tokio::select!{
Some(true) = data.shutdown.recv() => {
debug!("Got shutdown (hang) request. Sending now then breaking");
let mut rest = {
let irx = rx.get_mut();
irx.close(); //accept no more inputs
let mut output = Vec::with_capacity(opt.capacity);
while let Ok(item) = irx.try_recv() {
output.push(item);
}
output
};
rest.extend(rx.take_now());
if rest.len() > 0 {
if let Err(err) = tx.send(rest).await {
error!("Failed to force send buffer, exiting now: {}", err);
}
}
break;
}
_ = time::delay_for(opt.timeout) => {
trace!("Setting push now");
rx.push_now();
}
_ = from.0.push_now.notified() => {
debug!("Got force push signal");
let take =rx.take_now();
rx.push_now();
if take.len() > 0 {
if let Err(err) = tx.send(take).await {
error!("Failed to force send buffer: {}", err);
break;
}
}
}
Some(buffer) = rx.next() => {
debug!("Sending {} (cap {})", buffer.len(), buffer.capacity());
if let Err(err) = tx.send(buffer).await {
// Receive closed?
//
// This probably shouldn't happen, as we `select!` for it up there and child never calls `close()` on `rx`.
// In any case, it means we should abort.
/*#[cold]*/ error!("Failed to send buffer: {}", err);
break;
}
}
}
}
}
let last = rx.into_buffer();
if last.len() > 0 {
if let Err(err) = tx.send(last).await {
error!("Failed to force send last part of buffer: {}", err);
} else {
trace!("Sent rest of buffer");
}
}
} => {
// Normal exit
trace!("Normal exit")
},
}
trace!("Waiting on child");
// No more handles except child, no more possible inputs.
child.await.expect("Child panic");
trace!("Returning");
}
/// Spawn a new chain handler for this chain.
pub fn spawn(from: chain::Chain<String>, opt: Settings) -> (JoinHandle<()>, ChainHandle<String>)
{
debug!("Spawning with opt: {:?}", opt);
let handle = ChainHandle::with_settings(from, opt);
(tokio::spawn(host(handle.clone())), handle)
}
#[derive(Debug)]
pub struct ChainStream<T: Send + chain::Chainable>
{
chain: Weak<Box<Handle<T>>>,
buffer: Vec<T>,
}
impl ChainStream<String>
{
async fn try_pull(&mut self, n: usize) -> Option<NonZeroUsize>
{
if n == 0 {
return None;
}
if let Some(read) = self.chain.upgrade() {
let chain = read.chain.read().await;
if chain.is_empty() {
return None;
}
let n = if n == 1 {
self.buffer.push(chain.generate_str());
1
} else {
self.buffer.extend(chain.str_iter_for(n));
n //for now
};
Some(unsafe{NonZeroUsize::new_unchecked(n)})
} else {
None
}
}
}
impl Stream for ChainStream<String>
{
type Item = String;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use futures::Future;
let this = self.get_mut();
if this.buffer.len() == 0 {
let pull = this.try_pull(this.buffer.capacity());
tokio::pin!(pull);
match pull.poll(cx) {
Poll::Ready(Some(_)) => {},
Poll::Pending => return Poll::Pending,
_ => return Poll::Ready(None),
};
}
debug_assert!(this.buffer.len()>0);
Poll::Ready(Some(this.buffer.remove(0)))
}
}

@ -0,0 +1,181 @@
//! Filter accepts and denies based on cidr masks.
use super::*;
use cidr::{
Cidr,
IpCidr,
};
use std::{
net::{
IpAddr,
},
error,
fmt,
};
#[derive(Debug)]
pub struct IpFilterDeniedError(IpAddr, Option<IpCidr>);
impl warp::reject::Reject for IpFilterDeniedError{}
impl error::Error for IpFilterDeniedError{}
impl fmt::Display for IpFilterDeniedError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "Denied {} due to ", self.0)?;
match &self.1 {
Some(cidr) => write!(f, "matching rule {}", cidr),
None => write!(f, "non-matching accept rule"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Rule
{
Accept,
Deny,
}
impl Default for Rule
{
#[inline]
fn default() -> Self
{
Self::Deny
}
}
impl Rule
{
fn into_result<'a>(self, net: Option<&'a IpCidr>) -> Result<Option<&'a IpCidr>, Option<IpCidr>>
{
if let Self::Accept = self {
Ok(net)
} else {
Err(net.cloned())
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct IpFilter
{
/// The default fallback rule
pub default: Rule,
#[serde(default)]
accept: Vec<IpCidr>,
#[serde(default)]
deny: Vec<IpCidr>,
}
#[inline] fn find_in<'a>(needle: &IpAddr, haystack: &'a [IpCidr]) -> Option<&'a IpCidr>
{
for x in haystack.iter()
{
if x.contains(needle) {
return Some(x);
}
}
None
}
impl Default for IpFilter
{
#[inline]
fn default() -> Self
{
Self {
default: Rule::Deny,
accept: vec![cidr::Cidr::new_host([127,0,0,1].into())],
deny: Vec::default(),
}
}
}
impl IpFilter
{
/// Create a new CIDR filter with thie default rule.
///
/// Use `default()` to use with default rule.
pub fn new(fallback: Rule) -> Self
{
Self {
default: fallback,
accept: Vec::new(),
deny: Vec::new(),
}
}
/// Checks the rule for this IP, returns a result if it should accept or not.
///
/// If acceptance rule is met, return the CIDR match that caused the acceptance if applicable
///
/// If acceptance rule is not met, return in the error which CIDR match cause the deny if applicable
pub fn check(&self, ip: &IpAddr) -> Result<Option<&'_ IpCidr>, IpFilterDeniedError>
{
let accept = find_in(ip, &self.accept[..]);
let deny = find_in(ip, &self.deny[..]);
let (rule, cidr) = match (accept, deny) {
(None, Some(net)) => (Rule::Deny, Some(net)),
(Some(net), None) => (Rule::Accept, Some(net)),
(Some(ac), Some(den)) if ac != den => {
if ac.network_length() > den.network_length() {
(Rule::Accept, Some(ac))
} else {
(Rule::Deny, Some(den))
}
},
_ => (self.default, None)
};
rule.into_result(cidr)
.map_err(|cidr| IpFilterDeniedError(*ip, cidr))
}
pub fn accept_mask(&self) -> &[IpCidr]
{
&self.accept[..]
}
pub fn deny_mask(&self) -> &[IpCidr]
{
&self.deny[..]
}
pub fn accept_range(&mut self, items: impl IntoIterator<Item = IpCidr>)
{
self.accept.extend(items)
}
pub fn deny_range(&mut self, items: impl IntoIterator<Item = IpCidr>)
{
self.deny.extend(items)
}
pub fn accept_one(&mut self, item: IpCidr)
{
self.accept.push(item)
}
pub fn deny_one(&mut self, items: IpCidr)
{
self.deny.push(items)
}
/// Can any connection ever be accepted?
pub fn possible(&self) -> bool
{
//TODO: Test this
!(self.default == Rule::Deny && self.accept.len() == 0) &&
!(self.deny.iter().find(|x| x.network_length() == 0).is_some() && self.accept.len() == 0)
}
}
pub async fn recover(err: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection>
{
if let Some(t) = err.find::<IpFilterDeniedError>() {
error!("Denying access to {} because of {:?} (403)", t.0, t.1);
Ok(warp::http::Response::builder()
.status(status!(403))
.body(format!("Access denied: {}", t)))
} else {
Err(err)
}
}

@ -1,5 +1,3 @@
#![feature(split_inclusive)]
#![allow(dead_code)]
#[macro_use] extern crate log;
@ -76,8 +74,10 @@ mod msg;
mod state;
use state::State;
mod save;
mod ipfilt;
mod forwarded_list;
use forwarded_list::XForwardedFor;
mod handle;
mod feed;
mod gen;
@ -134,7 +134,7 @@ async fn main() {
debug!("Using config {:?}", config);
trace!("With config cached: {:?}", ccache);
let chain = Arc::new(RwLock::new(match save::load(&config.file).await {
let (chain_handle, chain) = handle::spawn(match save::load(&config.file).await {
Ok(chain) => {
info!("Loaded chain from {:?}", config.file);
chain
@ -144,16 +144,15 @@ async fn main() {
trace!("Error: {}", e);
Chain::new()
},
}));
}, ccache.handler_settings.clone());
{
let mut tasks = Vec::<BoxFuture<'static, ()>>::new();
tasks.push(chain_handle.map(|res| res.expect("Chain handle panicked")).boxed());
let (state, chain) = {
let save_when = Arc::new(Notify::new());
let state = State::new(config,
ccache,
Arc::clone(&chain),
Arc::clone(&save_when));
chain);
let state2 = state.clone();
let saver = tokio::spawn(save::host(Box::new(state.clone())));
let chain = warp::any().map(move || state.clone());
@ -172,11 +171,25 @@ async fn main() {
} else {
warp::filters::addr::remote().and_then(|x: Option<SocketAddr>| async move {x.map(|x| x.ip()).ok_or_else(|| warp::reject::not_found())}).boxed()
};
let ipfilter = warp::any()
.and(chain)
.and(client_ip)
.and_then(|state: State, host: IpAddr| {
async move {
state.config().mask.check(&host)
.map(|ci| {
trace!("Accepting from rule {:?}", ci);
host
})
.map(move |host| (state, host))
.map_err(warp::reject::custom)
}
}).untuple_one();
let push = warp::put()
.and(warp::path("put"))
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::stream())
.and_then(|state: State, host: IpAddr, buf| {
@ -186,6 +199,8 @@ async fn main() {
.map_err(|_| warp::reject::not_found()) //(warp::reject::custom) //TODO: Recover rejection filter down below for custom error return
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::put"));
@ -195,8 +210,8 @@ async fn main() {
let single = {
let msz = state.config().max_gen_size;
warp::post()
.and(ipfilter.clone())
.and(warp::path("single"))
.and(client_ip.clone())
.and(warp::path::param()
.map(move |sz: usize| {
if sz == 0 || (2..=msz).contains(&sz) {
@ -209,11 +224,13 @@ async fn main() {
.unify())
.and(warp::body::content_length_limit(state.config().max_content_length))
.and(warp::body::aggregate())
.map(|_, x, y, z| (x,y,z)).untuple_one()
.and_then(api::single)
.with(warp::log("markov::api::single"))
};
warp::path("api")
.and(single)
.recover(ipfilt::recover)
.recover(api::error::rejection)
};
}
@ -221,8 +238,7 @@ async fn main() {
let read = warp::get()
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| {
@ -240,12 +256,12 @@ async fn main() {
}))))
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::read"));
let sentance = warp::get()
.and(warp::path("sentance")) //TODO: sanitise::Sentance::new_iter the body line
.and(chain.clone())
.and(client_ip.clone())
.and(ipfilter.clone())
.and(warp::path::param().map(|opt: usize| Some(opt))
.or(warp::path::end().map(|| Option::<usize>::None)).unify())
.and_then(|state: State, host: IpAddr, num: Option<usize>| {
@ -263,6 +279,7 @@ async fn main() {
}))))
}
})
.recover(ipfilt::recover)
.with(warp::log("markov::read::sentance"));
let read = warp::path("get").and(read.or(sentance));

@ -3,6 +3,7 @@ use super::*;
use tokio::{
sync::{
watch,
Mutex,
},
};
use std::{
@ -12,7 +13,9 @@ use std::{
error,
};
use futures::{
future::Future,
future::{
Future,
},
};
#[derive(Debug)]
@ -160,3 +163,48 @@ impl Future for Initialiser
uhh.poll(cx)
}
}
/// A value that can be consumed once.
#[derive(Debug)]
pub struct Once<T>(Mutex<Option<T>>);
impl<T> Once<T>
{
/// Create a new instance
pub fn new(from: T) -> Self
{
Self(Mutex::new(Some(from)))
}
/// Consume into the instance from behind a potentially shared reference.
pub async fn consume_shared(self: Arc<Self>) -> Option<T>
{
match Arc::try_unwrap(self) {
Ok(x) => x.0.into_inner(),
Err(x) => x.0.lock().await.take(),
}
}
/// Consume from a shared reference and panic if the value has already been consumed.
pub async fn unwrap_shared(self: Arc<Self>) -> T
{
self.consume_shared().await.unwrap()
}
/// Consume into the instance.
pub async fn consume(&self) -> Option<T>
{
self.0.lock().await.take()
}
/// Consume and panic if the value has already been consumed.
pub async fn unwrap(&self) -> T
{
self.consume().await.unwrap()
}
/// Consume into the inner value
pub fn into_inner(self) -> Option<T>
{
self.0.into_inner()
}
}

@ -113,7 +113,11 @@ use std::any::{
impl<T: 'static> DynRange<T>
{
#[inline]
pub fn into_boxed(self) -> Box<dyn Any /*TODO: + Send + Sync */+ 'static>
{
self.into_inner()
}
fn into_inner(self) -> Box<dyn Any + 'static>
{
match self {
@ -157,7 +161,12 @@ impl<T: 'static> DynRange<T>
}
pub fn downcast<R: RangeBounds<T> + 'static>(self) -> Result<R, Self>
{
Box::new(self).downcast()
self.into_inner().downcast::<R>()
.map(|x| *x)
.map_err(|b| {
todo!("make this bullshit properly unboxable ehh...")
})
//Box::<(dyn std::any::Any + 'static)>::downcast(Box::new(self)).map_ok(|ok| *ok)
}
}

@ -272,6 +272,6 @@ mod tests
let string = "abcdef ghi jk1\nhian";
assert_eq!(filter.filter_str(&string).to_string(), filter.filter_cow(&string).to_string());
assert_eq!(filter.filter_cow(&string).to_string(), filter.filter(string.chars()).collect::<String>());
assert_eq!(filter.filter_cow(&string).to_string(), filter.filter_iter(string.chars()).collect::<String>());
}
}

@ -13,7 +13,7 @@ use std::{
#[derive(Debug)]
pub struct SentanceError;
/// A sentance
/// A sentence
#[derive(Debug, PartialEq, Eq)]
#[repr(transparent)]
pub struct Sentance(str);
@ -25,7 +25,7 @@ macro_rules! new {
};
}
const DEFAULT_BOUNDARIES: &[char] = &['\n', '.', ':', '!', '?'];
const DEFAULT_BOUNDARIES: &[char] = &['\n', '.', ':', '!', '?', '~'];
lazy_static! {
static ref BOUNDARIES: smallmap::Map<char, ()> = {

@ -25,7 +25,7 @@ macro_rules! new {
};
}
const DEFAULT_BOUNDARIES: &[char] = &['!', '.', ','];
const DEFAULT_BOUNDARIES: &[char] = &['!', '.', ',', '*'];
lazy_static! {
static ref BOUNDARIES: smallmap::Map<char, ()> = {
@ -68,7 +68,8 @@ impl Word
}
/// Create a new iterator over words from this sentance.
pub fn new_iter<'a>(from: &'a (impl AsRef<Sentance> +?Sized+'a)) -> impl Iterator<Item = &'a Self>
pub fn new_iter<'a, 'b>(from: &'a (impl AsRef<Sentance> +?Sized+'b)) -> impl Iterator<Item = &'a Self>
where 'b: 'a
{
let from = from.as_ref();
from.split_inclusive(is_word_boundary)

@ -43,7 +43,7 @@ type Decompressor<T> = BzDecoder<T>;
pub async fn save_now(state: &State) -> io::Result<()>
{
let chain = state.chain().read().await;
let chain = state.chain_ref().read().await;
use std::ops::Deref;
let to = &state.config().file;
save_now_to(chain.deref(),to).await
@ -77,12 +77,13 @@ pub async fn host(mut state: Box<State>)
{
let to = state.config().file.to_owned();
let interval = state.config().save_interval();
let when = Arc::clone(state.when_ref());
trace!("Setup oke. Waiting on init");
if state.on_init().await.is_ok() {
debug!("Begin save handler");
while Arc::strong_count(state.when()) > 1 {
while Arc::strong_count(&when) > 1 {
{
let chain = state.chain().read().await;
let chain = state.chain_ref().read().await;
use std::ops::Deref;
if let Err(e) = save_now_to(chain.deref(), &to).await {
error!("Failed to save chain: {}", e);
@ -97,7 +98,7 @@ pub async fn host(mut state: Box<State>)
break;
}
}
state.when().notified().await;
when.notified().await;
if state.has_shutdown() {
break;
}

@ -1,17 +1,19 @@
//! /sentance/
use super::*;
use futures::StreamExt;
pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<String>) -> Result<(), gen::GenBodyError>
{
let string = {
let chain = state.chain().read().await;
if chain.is_empty() {
return Ok(());
}
let mut chain = state.chain_read();
match num {
None => chain.generate_str(),
Some(num) => (0..num).map(|_| chain.generate_str()).join("\n"),
None => chain.next().await.ok_or_else(gen::GenBodyError::default)?,
Some(num) if num < state.config().max_gen_size => {//(0..num).map(|_| chain.generate_str()).join("\n"),
let chain = chain.take(num);
chain.collect::<Vec<_>>().await.join("\n")//TODO: Stream version of JoinStrExt
},
_ => return Err(Default::default()),
}
};
@ -20,14 +22,14 @@ pub async fn body(state: State, num: Option<usize>, mut output: mpsc::Sender<Str
if let Some(num) = num {
for sen in sanitise::Sentance::new_iter(&string).take(num)
{
output.send(filter.filter_owned(sen.to_owned())).await.map_err(|e| gen::GenBodyError(e.0))?;
output.send(filter.filter_owned(sen.to_owned())).await?;
}
} else {
output.send(filter.filter_owned(match sanitise::Sentance::new_iter(&string)
.max_by_key(|x| x.len()) {
Some(x) => x,
#[cold] None => return Ok(()),
}.to_owned())).await.map_err(|e| gen::GenBodyError(e.0))?;
/*#[cold]*/ None => return Ok(()),
}.to_owned())).await?;
}
Ok(())
}

@ -12,8 +12,9 @@ use tokio::{
pub async fn handle(mut state: State)
{
let mut usr1 = unix::signal(SignalKind::user_defined1()).expect("Failed to hook SIGUSR1");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut usr2 = unix::signal(SignalKind::user_defined2()).expect("Failed to hook SIGUSR2");
let mut quit = unix::signal(SignalKind::quit()).expect("Failed to hook SIGQUIT");
let mut io = unix::signal(SignalKind::io()).expect("Failed to hook IO");
trace!("Setup oke. Waiting on init");
if state.on_init().await.is_ok() {
@ -24,19 +25,15 @@ pub async fn handle(mut state: State)
break;
}
_ = usr1.recv() => {
info!("Got SIGUSR1. Saving chain immediately.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
info!("Got SIGUSR1. Causing chain write.");
state.push_now();
},
_ = usr2.recv() => {
info!("Got SIGUSR1. Loading chain immediately.");
info!("Got SIGUSR2. Loading chain immediately.");
match save::load(&state.config().file).await {
Ok(new) => {
{
let mut chain = state.chain().write().await;
let mut chain = state.chain_ref().write().await;
*chain = new;
}
trace!("Replaced with read chain");
@ -46,6 +43,15 @@ pub async fn handle(mut state: State)
},
}
},
_ = io.recv() => {
info!("Got SIGIO. Saving chain immediately.");
if let Err(e) = save::save_now(&state).await {
error!("Failed to save chain: {}", e);
} else{
trace!("Saved chain okay");
}
},
_ = quit.recv() => {
warn!("Got SIGQUIT. Saving chain then aborting.");
if let Err(e) = save::save_now(&state).await {

@ -3,6 +3,7 @@ use super::*;
use tokio::{
sync::{
watch,
mpsc::error::SendError,
},
};
use config::Config;
@ -25,8 +26,8 @@ impl fmt::Display for ShutdownError
pub struct State
{
config: Arc<Box<(Config, config::Cache)>>, //to avoid cloning config
chain: Arc<RwLock<Chain<String>>>,
save: Arc<Notify>,
chain: handle::ChainHandle<String>,
//save: Arc<Notify>,
begin: Initialiser,
shutdown: Arc<watch::Sender<bool>>,
@ -78,13 +79,12 @@ impl State
&self.config_cache().outbound_filter
}
pub fn new(config: Config, cache: config::Cache, chain: Arc<RwLock<Chain<String>>>, save: Arc<Notify>) -> Self
pub fn new(config: Config, cache: config::Cache, chain: handle::ChainHandle<String>) -> Self
{
let (shutdown, shutdown_recv) = watch::channel(false);
Self {
config: Arc::new(Box::new((config, cache))),
chain,
save,
begin: Initialiser::new(),
shutdown: Arc::new(shutdown),
shutdown_recv,
@ -101,25 +101,48 @@ impl State
&self.config.as_ref().1
}
pub fn notify_save(&self)
/*pub fn notify_save(&self)
{
self.save.notify();
self.save.notify();
}*/
/*pub fn chain(&self) -> &RwLock<Chain<String>>
{
&self.chain.as_ref()
}*/
pub fn chain_ref(&self) -> &RwLock<Chain<String>>
{
&self.chain.chain_ref()
}
pub fn chain_read(&self) -> handle::ChainStream<String>
{
self.chain.read()
}
pub fn chain(&self) -> &RwLock<Chain<String>>
/// Write to this chain
pub async fn chain_write<'a, T: Stream<Item = String>>(&'a self, buffer: T) -> Result<(), SendError<Vec<String>>>
{
self.chain.write_stream(buffer).await
}
pub fn when_ref(&self) -> &Arc<Notify>
{
&self.chain.as_ref()
&self.chain.notify_when()
}
pub fn when(&self) -> &Arc<Notify>
/// Force the chain to push through now
pub fn push_now(&self)
{
&self.save
self.chain.push_now()
}
pub fn shutdown(self)
{
self.shutdown.broadcast(true).expect("Failed to communicate shutdown");
self.save.notify();
self.chain.hang();
self.when_ref().notify();
}
pub fn has_shutdown(&self) -> bool

Loading…
Cancel
Save