Possible source of bug is polling a local stack future in poll_read method. Fucks sake.

Fortune for rsh's current commit: Curse − 凶
sock-buffering
Avril 3 years ago
parent c96d098441
commit 56306fae83
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -154,47 +154,85 @@ fn try_release_buf<const SIZE: usize>(buffer: &mut SmallVec<[u8; SIZE]>, into: &
(!buffer.is_empty(), sz)
}
impl<T: AsyncRead + Unpin + ?Sized, const SIZE: usize> Buffered<T, SIZE>
{
async fn fill_buffer(&mut self) -> io::Result<bool>
{
let sz = self.buffer.len();
Ok(if sz != SIZE { // < SIZE
use tokio::prelude::*;
self.buffer.resize(SIZE, 0);
let done = {
let mut r=0;
let mut done =sz;
while done < SIZE && {r = self.stream.read(&mut self.buffer[done..]).await?; r > 0} {
done += r;
}
done
};
println!("Filling buffer to {}", done);
if done == SIZE {
true
} else {
self.buffer.resize(done, 0);
false
}
} else { // == SIZE
debug_assert!(sz == SIZE);
true
})
}
fn try_take_buffer<B: ?Sized + BufMut>(&mut self, to: &mut B) -> usize
{
if self.buffer.is_empty() {
println!("Buffer empty, skipping take");
return 0;
}
let copy = std::cmp::min(self.buffer.len(), to.remaining_mut());
println!("Draining {} bytes into output", copy);
to.put_slice(&self.buffer[..copy]);
self.buffer.drain(..copy);
copy
}
}
// XXX: I don't think writing futures like this is safe. Expand the inline `async{}`s into actual polling.
impl<T: ?Sized + Unpin, const SIZE: usize> AsyncRead for Buffered<T, SIZE>
where T: AsyncRead
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.get_mut();
//XXX: Why TF doesn't this work!??!?!??!?!?
//XXX: Pinning then polling this future is causing BUG... Ffs...
let fut = async {
use tokio::prelude::*;
let mut w = 0;
let mut temp = [0u8; SIZE];
for buf in buf.chunks_mut(SIZE) {
let mut done=0;
let mut r;
if this.buffer.len() == 0
{
while done < SIZE {
// Fill the buffer
r = this.stream.read(&mut temp[done..]).await?;
done += r;
if r == 0 {
/*if this.buffer.is_empty()
{
this.fill_buffer().await?;
}*/
while w < buf.len() {
match this.try_take_buffer(&mut &mut buf[w..]) {
0 => {
if !this.fill_buffer().await?
&& this.buffer.is_empty()
{
println!("Buffer empty");
break;
} else {
println!("Buffer filled");
}
}
this.buffer.extend_from_slice(&temp[..done]);
}
// Drain buffer into `buf` (chunk)
let sz = std::cmp::min(this.buffer.len(), buf.len());
println!("Buffer size: {}, {} -> {} ({})", this.buffer.len(), sz, buf.len(), w);
(&mut buf[..sz]).copy_from_slice(&this.buffer[..sz]);
drop(this.buffer.drain(0..sz));
w+= sz;
// Wtf?
if done == 0 {
break;
},
x => w+=x,
}
}
println!("Done: {}", w);
Result::<usize, io::Error>::Ok(w)
};
tokio::pin!(fut);

Loading…
Cancel
Save