Compare commits

...

16 Commits

Author SHA1 Message Date
Avril c3f678a81e
I FUCKING AM SO CLOSE TO GIVING UP.
3 years ago
Avril d82c46b12d
FUCKING FAILURE: For FUCKs sake. NO MATTER HOW MUCH STATE I ADD TO THE DAMN THING, IT STILL FAILS TO READ PROPERLY AT SOME ARBITRARY POINT IN THE BUFFER.
3 years ago
Avril ed69a8f187
Failure: Keeping buffer read/size state between pendings STILL doesn"t work...
3 years ago
Avril 019bdee5c1
Failure: Converted `read_test` to a poll-based function, it still loses state required to maintain the buffer location. There"s no way to make this code keep the state. We need to re-design the function to something completely different.
3 years ago
Avril fc2b10a306
Found bug. poll_read needs rewrite. Dunno how to write it tho.
3 years ago
Avril 5d5748b5ea
Found bug! It is **NOT** safe to pin and poll a stack future in a `poll*` Future method.
3 years ago
Avril 7cad244c16
FUCK THIS BULLSHIT. I LITERALLY JUST WANT A BUFFERED STREAM. WHY IS THERE NO LIBRARY FOR THIS, WHY IS THIS NOT THERE BY DEFAULT. FUCK EVERYTHING. I HATE THIS STUPID BOILERPLATE
3 years ago
Avril 56306fae83
Possible source of bug is polling a local stack future in poll_read method. Fucks sake.
3 years ago
Avril c96d098441
Buffered reader doesn"t work,no fucking clue why?
3 years ago
Avril 9d927c548a TODO: impl `AsyncRead` for `Buffered<impl AsyncRead, SIZE>`
3 years ago
Avril a6a25259b8
Buffered: Fix buffer not being cleared on flush. Added internal info messages when `cfg!(test)` is enabled.
3 years ago
Avril 818659b83c
Buffered writer seems to work.
3 years ago
Avril 19d1db35d6 Naive, basic impl of `AsyncWrite` for `Buffered<T,SIZE>` (untested, probably incorrect poll impls).
3 years ago
Avril 90c9fce20c
FIrst attempt at buffering stream... Doesn"t seem to be working well...
3 years ago
Avril 6f8d367080
Documented currently non-functioning `Buffered<T>` fields
3 years ago
Avril 69d546d2d1
Started `Buffered<T, const SIZE: usize>`: Static buffering stream wrapper for `ESock` syncing.
3 years ago

@ -17,7 +17,7 @@ mopa = "0.2.2"
pin-project = "1.0.8"
serde = { version = "1.0.126", features = ["derive"] }
serde_cbor = "0.11.1"
smallvec = { version = "1.6.1", features = ["union", "serde", "write"] }
smallvec = { version = "1.6.1", features = ["union", "serde", "write", "const_generics"] }
stackalloc = "1.1.1"
tokio = { version = "0.2", features = ["full"] }
tokio-uring = "0.1.0"

@ -26,6 +26,12 @@ pub fn vec_uninit<T>(sz: usize) -> Vec<MaybeUninit<T>>
}
}
pub trait IsTrue{}
#[derive(Debug)]
pub struct Assert<const VALUE: bool>;
impl IsTrue for Assert<true>{}
/// Create a blanket-implementing trait that is a subtrait of any number of traits.
///
/// # Usage

@ -18,8 +18,11 @@ use bytes::Bytes;
use cancel::*;
pub mod buffered;
pub mod enc;
pub use buffered::Buffered;
/// Details of a newly accepted raw socket peer.
///
/// This connected will have been "accepted", but not yet trusted

@ -0,0 +1,371 @@
//! Stream buffering for sync of encrypted socked.
use super::*;
use smallvec::SmallVec;
use std::io;
use std::{
task::{
Poll, Context,
},
pin::Pin,
};
use bytes::{
Buf, BufMut,
};
/// A wrapping buffer over a writer and/or reader.
#[pin_project]
#[derive(Debug)]
pub struct Buffered<T: ?Sized, const SIZE: usize>
{
/// Current internal buffer
/// When it's full to `SIZE`, it should be written to `stream` at once then cleared when it's been written.
buffer: SmallVec<[u8; SIZE]>, //TODO: Can we have a non-spilling stack vec?
pending: usize, w: usize,
#[pin] stream: T
}
impl<T, const SIZE: usize> Buffered<T, SIZE>
where [(); SIZE]: Sized, // This isn't checked?
{
/// Create a new staticly sized buffer wrapper around this stream
pub fn new(stream: T) -> Self
{
assert!(SIZE > 0, "Size of buffer cannot be 0");
Self {
buffer: SmallVec::new(),
pending: 0, w: 0,
stream,
}
}
/// Consume into the wrapped stream
pub fn into_inner(self) -> T
{
self.stream
}
/// The inner stream
pub fn inner(&self) -> &T
{
&self.stream
}
/// A mutable reference to the backing stream
pub fn inner_mut(&mut self) -> &mut T
{
&mut self.stream
}
/// The current buffer bytes.
pub fn current_buffer(&self) -> &[u8]
{
&self.buffer[..]
}
/// Is the current internal buffer empty?
///
/// You can flush a partially-filled buffer to the backing stream of a writer with `.flush().await`.
pub fn is_empty(&self) -> bool
{
self.buffer.is_empty()
}
}
#[inline] fn div_mod<V>(a: V, b: V) -> (V, <V as std::ops::Div>::Output, <V as std::ops::Rem>::Output)
where V: std::ops::Div + std::ops::Rem + Clone
{
(a.clone(), a.clone() / b.clone(), a % b)
}
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncWrite for Buffered<T, SIZE>
where T: AsyncWrite
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
// TODO: Don't write poll methods like this ffs... Write it properly.
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
let mut written=0;
let mut err = None;
this.buffer.extend_from_slice(buf);
for chunk in this.buffer.chunks_exact(SIZE)
{
if cfg!(test) {
eprintln!("Pushing chunk: {:?}", chunk);
}
match this.stream.write_all(&chunk).await {
Ok(()) => {
written += chunk.len();
},
Err(e) => {
err = Some(e);
break;
},
}
}
this.buffer.drain(0..written);
if let Some(err) = err {
Err(err)
} else {
Ok(buf.len())
}
};
tokio::pin!(fut);
fut.poll(cx)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
let wres = if this.buffer.len() > 0 {
if cfg!(test) {
eprintln!("Pushing rest: {:?}", &this.buffer[..]);
}
let res = this.stream.write_all(&this.buffer[..]).await;
this.buffer.clear();
res
} else {
Ok(())
};
this.stream.flush().await?;
wres
};
tokio::pin!(fut);
fut.poll(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
this.flush().await?;
this.stream.shutdown().await
};
tokio::pin!(fut);
fut.poll(cx)
}
}
fn try_release_buf<const SIZE: usize>(buffer: &mut SmallVec<[u8; SIZE]>, into: &mut [u8]) -> (bool, usize)
{
let sz = std::cmp::min(buffer.len(), into.len());
(&mut into[..sz]).copy_from_slice(&buffer[..sz]);
drop(buffer.drain(..sz));
(!buffer.is_empty(), sz)
}
impl<T: AsyncRead + Unpin + ?Sized, const SIZE: usize> Buffered<T, SIZE>
{
async fn fill_buffer(&mut self) -> io::Result<bool>
{
let sz = self.buffer.len();
Ok(if sz != SIZE { // < SIZE
use tokio::prelude::*;
// XXXX::: I think the issue is, this function comes before the await point, meaning it is ran twice after the first poll? I have no fucking idea. I hate this... I just want a god damn buffered stream. WHY IS THIS SO CANCEROUS.
self.buffer.resize(SIZE, 0);
let done = {
let mut r=0;
let mut done =sz;
while done < SIZE && {r = self.stream.read(&mut self.buffer[done..]).await?; r > 0} {
done += r;
}
done
};
println!("Filling buffer to {}", done);
if done == SIZE {
true
} else {
self.buffer.resize(done, 0);
false
}
} else { // == SIZE
debug_assert!(sz == SIZE);
true
})
}
fn try_take_buffer<B: ?Sized + BufMut>(&mut self, to: &mut B) -> usize
{
if self.buffer.is_empty() {
println!("Buffer empty, skipping take");
return 0;
}
let copy = std::cmp::min(self.buffer.len(), to.remaining_mut());
println!("Draining {} bytes into output", copy);
to.put_slice(&self.buffer[..copy]);
self.buffer.drain(..copy);
copy
}
// async-based impl of `read`. there as a reference for when we find out how to write `poll_read`. Sigh...
async fn read_test(&mut self, buf: &mut [u8]) -> io::Result<usize>
{
use tokio::prelude::*;
let mut w = 0;
while w < buf.len() {
match self.try_take_buffer(&mut &mut buf[w..]) {
0 => {
if !self.fill_buffer().await?
&& self.buffer.is_empty()
{
println!("Buffer empty");
break;
} else {
println!("Buffer filled");
}
},
x => w+=x,
}
}
println!("Done: {}", w);
Result::<usize, io::Error>::Ok(w)
}
}
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncRead for Buffered<T, SIZE>
where T: AsyncRead
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.get_mut();
let res = loop {
let read = if this.buffer.len() < SIZE || this.pending > 0
{
let st = if this.pending > 0 {this.pending-1} else { this.buffer.len() };
this.buffer.resize(SIZE, 0);
let mut done=st;
let mut r=0;
//XXX: Same issue even trying to save buffer length state over Pendings... Wtf is going on here?
macro_rules! ready {
(try $poll:expr) => {
match $poll {
Poll::Pending => {
this.pending = st+1;
//this.buffer.resize(done, 0);
return Poll::Pending;
},
Poll::Ready(Ok(x)) => x,
err => {
//this.buffer.resize(done, 0);
return err;
}
}
}
}
// XXX: V Same issue, runs the above code twice when re-polling after Pending. We need to make sure we jump back to this point in the code following a Pending poll to `stream.poll_read`, but I have no fucking clue how to do this? Eh...... We'll probably need to design the code differently. There is a lot of state that gets lost here and idk how to preserve it.... I hate this.
while done < SIZE && {r = ready!(try Pin::new(&mut this.stream).poll_read(cx, &mut this.buffer[done..])); r > 0}
{
done +=r;
}
this.pending = 0;
// This causes early eof (0)
//println!("Done: {}", done);
//this.buffer.resize(done, 0);
done
} else {
this.buffer.len()
};
match this.try_take_buffer(&mut &mut buf[this.w..]) {
0 => break Ok(this.w),
x => this.w+=x,
}
};
this.w = 0;
Poll::Ready(res)
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[tokio::test]
async fn writer() -> eyre::Result<()>
{
use tokio::prelude::*;
let (tx, mut rx) = tokio::io::duplex(11);
let mut ttx = Buffered::<_, 4>::new(tx);
let back = tokio::spawn(async move {
println!("Writing bytes");
ttx.write_all(b"Hello world").await?;
println!("Waiting 1 second...");
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("Flushing stream");
ttx.flush().await?;
ttx.shutdown().await?;
Result::<_, std::io::Error>::Ok(())
});
let mut output = Vec::new();
println!("Reading full stream...");
tokio::io::copy(&mut rx, &mut output).await?;
println!("Waiting for background...");
back.await.expect("Back panick")?;
println!("Expected {:?}, got {:?}", b"Hello world", &output);
assert_eq!(&output[..], b"Hello world");
Ok(())
}
#[tokio::test]
async fn reader() -> eyre::Result<()>
{
use tokio::prelude::*;
const DATA: &'static [u8] = b"Hello world";
let (mut tx, rx) = tokio::io::duplex(11);
let mut rx = Buffered::<_, 4>::new(rx);
let back = tokio::spawn(async move {
tx.write_all(DATA).await?;
tx.write_all(DATA).await?;
tx.flush().await?;
tx.shutdown().await?;
Result::<_, std::io::Error>::Ok(())
});
let mut output = vec![0u8; DATA.len()*2];
// Bug found! Pinning and polling that stack future in `poll_read` does NOT work!
// (we unrolled the async function to a poll based one and we're STILL losing state.... FFS!)
// The exact same works as a real async function.
/*
rx.read(&mut output[..DATA.len()]).await?;
rx.read(&mut output[DATA.len()..]).await?;
*/
/* THIS SHIT HANGS???????????????
tokio::io::copy(&mut rx, &mut output).await?;
*/
assert_eq!(rx.read(&mut output[..DATA.len()]).await?, DATA.len());
assert_eq!(rx.read(&mut output[DATA.len()..]).await?, DATA.len());
back.await.expect("Back panick")?;
eprintln!("String: {}", String::from_utf8_lossy(&output[..]));
assert_eq!(&output[..DATA.len()], &DATA[..]);
assert_eq!(&output[DATA.len()..], &DATA[..]);
Ok(())
}
}
Loading…
Cancel
Save