Buffered reader doesn"t work,no fucking clue why?

Fortune for rsh's current commit: Small curse − 小凶
sock-buffering
Avril 3 years ago
parent 9d927c548a
commit c96d098441
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -146,13 +146,59 @@ where T: AsyncWrite
}
}
fn try_release_buf<const SIZE: usize>(buffer: &mut SmallVec<[u8; SIZE]>, into: &mut [u8]) -> (bool, usize)
{
let sz = std::cmp::min(buffer.len(), into.len());
(&mut into[..sz]).copy_from_slice(&buffer[..sz]);
drop(buffer.drain(..sz));
(!buffer.is_empty(), sz)
}
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncRead for Buffered<T, SIZE>
where T: AsyncRead
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
todo!("How to properly do this?")
let this = self.get_mut();
//XXX: Why TF doesn't this work!??!?!??!?!?
let fut = async {
use tokio::prelude::*;
let mut w = 0;
let mut temp = [0u8; SIZE];
for buf in buf.chunks_mut(SIZE) {
let mut done=0;
let mut r;
if this.buffer.len() == 0
{
while done < SIZE {
// Fill the buffer
r = this.stream.read(&mut temp[done..]).await?;
done += r;
if r == 0 {
break;
}
}
this.buffer.extend_from_slice(&temp[..done]);
}
// Drain buffer into `buf` (chunk)
let sz = std::cmp::min(this.buffer.len(), buf.len());
println!("Buffer size: {}, {} -> {} ({})", this.buffer.len(), sz, buf.len(), w);
(&mut buf[..sz]).copy_from_slice(&this.buffer[..sz]);
drop(this.buffer.drain(0..sz));
w+= sz;
// Wtf?
if done == 0 {
break;
}
}
Result::<usize, io::Error>::Ok(w)
};
tokio::pin!(fut);
fut.poll(cx)
}
}
@ -172,6 +218,7 @@ mod tests
println!("Writing bytes");
ttx.write_all(b"Hello world").await?;
println!("Waiting 1 second...");
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("Flushing stream");
@ -191,5 +238,36 @@ mod tests
Ok(())
}
#[tokio::test]
async fn reader() -> eyre::Result<()>
{
use tokio::prelude::*;
const DATA: &'static [u8] = b"Hello world";
let (mut tx, rx) = tokio::io::duplex(11);
let mut rx = Buffered::<_, 4>::new(rx);
let back = tokio::spawn(async move {
tx.write_all(DATA).await?;
tx.write_all(DATA).await?;
tx.flush().await?;
tx.shutdown().await?;
Result::<_, std::io::Error>::Ok(())
});
let mut output =Vec::new();
tokio::io::copy(&mut rx, &mut output).await?;
back.await.expect("Back panick")?;
eprintln!("String: {}", String::from_utf8_lossy(&output[..]));
assert_eq!(&output[..DATA.len()], &DATA[..]);
assert_eq!(&output[DATA.len()..], &DATA[..]);
Ok(())
}
}

Loading…
Cancel
Save