Naive, basic impl of `AsyncWrite` for `Buffered<T,SIZE>` (untested, probably incorrect poll impls).

Fortune for rsh's current commit: Future curse − 末凶
sock-buffering
Avril 3 years ago
parent 90c9fce20c
commit 19d1db35d6

@ -17,87 +17,133 @@ use bytes::{
#[derive(Debug)]
pub struct Buffered<T: ?Sized, const SIZE: usize>
{
/// When an I/O error is hit when reading /writing into/from the internal buffer, store it here if there is data to dump from it.
/// When there is no more data to return, return this error and then set `poisoned` to true.
///
/// After that, all calls to `poll_read()`/`poll_write()` will return a new error indicating the stream has faulted.
error: Option<io::Error>,
/// If an error has been set above and/or released from the above slot, set this to true.
/// Then all subsequent reads or writes will return a new error.
poisoned: bool,
/// Current internal buffer
/// When it's full to `SIZE`, it should be written to `stream` at once then cleared when it's been written.
buffer: SmallVec<[u8; SIZE]>, //TODO: Can we have a non-spilling stack vec?
#[pin] stream: T
}
impl<T, const SIZE: usize> Buffered<T, SIZE>
{
/// Create a new staticly sized buffer wrapper around this stream
pub fn new(stream: T) -> Self
{
Self {
buffer: SmallVec::new(),
stream,
}
}
/// Consume into the wrapped stream
pub fn into_inner(self) -> T
{
self.stream
}
/// The inner stream
pub fn inner(&self) -> &T
{
&self.stream
}
/// A mutable reference to the backing stream
pub fn inner_mut(&mut self) -> &mut T
{
&mut self.stream
}
/// The current buffer bytes.
pub fn current_buffer(&self) -> &[u8]
{
&self.buffer[..]
}
/// Is the current internal buffer empty?
///
/// You can flush a partially-filled buffer to the backing stream of a writer with `.flush().await`.
pub fn is_empty(&self) -> bool
{
self.buffer.is_empty()
}
}
#[inline] fn div_mod<V>(a: V, b: V) -> (V, <V as std::ops::Div>::Output, <V as std::ops::Rem>::Output)
where V: std::ops::Div + std::ops::Rem + Clone
{
(a.clone(), a.clone() / b.clone(), a % b)
}
impl<T: ?Sized, const SIZE: usize> AsyncWrite for Buffered<T, SIZE>
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncWrite for Buffered<T, SIZE>
where T: AsyncWrite
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let this = self.project();
let w = if buf.len() + this.buffer.len() > SIZE {
// `buf` cannot fit into buffer, push the bytes that can fit.
let buf = &buf[..(this.buffer.len()-SIZE)];
// TODO: Don't write poll methods like this ffs... Write it properly.
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
let mut written=0;
let mut err = None;
this.buffer.extend_from_slice(buf);
buf.len()
} else {
// `buf` can fit into buffer.
this.buffer.extend_from_slice(&buf[..]);
buf.len()
};
// Push `SIZE` bytes from buffer,
loop {
if this.buffer.len() >= SIZE {
use futures::ready;
match ready!(this.stream.poll_write(cx, &this.buffer[..SIZE])) {
Ok(n) => {
this.buffer.drain(0..n);
for chunk in this.buffer.chunks_exact(SIZE)
{
match this.stream.write_all(&chunk).await {
Ok(()) => {
written += chunk.len();
},
Err(e) => {
err = Some(e);
break;
},
x => return Poll::Ready(x),
//TODO: How to break after this?
}
}
this.buffer.drain(0..written);
if let Some(err) = err {
Err(err)
} else {
// TODO: How to re-queue for polling?
todo!()
Ok(written)
}
}
Poll::Ready(Ok(w))
/*
match div_mod(buf.len(), SIZE) {
// Buffer is equal to `SIZE`
(sz, _, _) if sz == SIZE => (),
// Buffer is smaller than `SIZE`
(sz, 0, x) => (),
// Buffer is a multiple of SIZE,
(sz, x, 0) => (),
}*/
// TODO: Push `buf` into `self.buffer`. When `self.buffer` is full, write to `self.stream` and clear what's been written.
};
tokio::pin!(fut);
fut.poll(cx)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
todo!("Flush any remaining bytes in the buffer to `stream`")
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
let wres = if this.buffer.len() > 0 {
this.stream.write_all(&this.buffer[..]).await
} else {
Ok(())
};
this.stream.flush().await?;
wres
};
tokio::pin!(fut);
fut.poll(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
// I think we're supposed to imply flush here?
/*
(&mut self).poll_flush(cx)?;
let this = self.project();
this.stream.poll_flush(cx)?;
this.stream.poll_shutdown(cx)
*/
todo!("How to do the above?")
let this = self.get_mut();
let fut = async {
use tokio::prelude::*;
this.flush().await?;
this.stream.shutdown().await
};
tokio::pin!(fut);
fut.poll(cx)
}
}
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncRead for Buffered<T, SIZE>
where T: AsyncRead
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
todo!("How to properly do this?")
}
}
//TODO: Write tests for writer

Loading…
Cancel
Save