//! Stream related things use super::*; use std::{ task::{ Poll, Context, }, pin::Pin, }; use tokio::{ io::{ AsyncBufRead, AsyncRead, }, prelude::*, }; use futures::{ stream::{ Stream, StreamExt, Fuse, }, }; use pin_project::pin_project; /// Converts a stream of byte-containing objects into an `AsyncRead` and `AsyncBufRead`er. #[pin_project] pub struct StreamReader where I: Stream { #[pin] source: Fuse, buffer: Vec, } impl StreamReader where I: Stream, T: AsRef<[u8]> { /// The current buffer pub fn buffer(&self) -> &[u8] { &self.buffer[..] } /// Consume into the original stream pub fn into_inner(self) -> I { self.source.into_inner() } /// Create a new instance with a buffer capacity pub fn with_capacity(source: I, cap: usize) -> Self { Self { source: source.fuse(), buffer: Vec::with_capacity(cap) } } /// Create a new instance from this stream pub fn new(source: I) -> Self { Self { source: source.fuse(), buffer: Vec::new(), } } /// Attempt to add to this buffer #[cold] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); match this.source.poll_next(cx) { Poll::Ready(None) => Poll::Ready(0), Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => { let buf = buf.as_ref(); this.buffer.extend_from_slice(buf); Poll::Ready(buf.len()) }, _ => Poll::Pending, } } } impl, I: Stream> AsyncRead for StreamReader { fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { let this = self.project(); if this.buffer.len() != 0 { // We can fill the whole buffer, do it. Poll::Ready(Ok(bytes::copy_slice(buf, this.buffer.drain(..buf.len()).as_slice()))) } else { // Buffer is empty, try to fill it match match this.source.poll_next(cx) { Poll::Ready(None) => Poll::Ready(0), Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => { let buf = buf.as_ref(); this.buffer.extend_from_slice(buf); Poll::Ready(buf.len()) }, _ => Poll::Pending, } { Poll::Ready(0) => Poll::Ready(Ok(0)), Poll::Ready(x) => { // x has been written Poll::Ready(Ok(bytes::copy_slice(buf, this.buffer.drain(..x).as_slice()))) }, _ => Poll::Pending, } } } } impl, I: Stream> AsyncBufRead for StreamReader { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if this.buffer.len() < 1 { // Fetch more into buffer match match this.source.poll_next(cx) { Poll::Ready(None) => Poll::Ready(0), Poll::Ready(Some(buf)) if buf.as_ref().len() > 0 => { let buf = buf.as_ref(); this.buffer.extend_from_slice(buf); Poll::Ready(buf.len()) }, _ => Poll::Pending, } { Poll::Ready(0) => Poll::Ready(Ok(&[])), // should we return EOF error here? Poll::Ready(x) => Poll::Ready(Ok(&this.buffer[..x])), _ => Poll::Pending } } else { Poll::Ready(Ok(&this.buffer[..])) } } fn consume(self: Pin<&mut Self>, amt: usize) { self.project().buffer.drain(..amt); } } #[cfg(test)] mod tests { use super::*; use tokio::{ sync::{ mpsc, }, }; #[tokio::test] async fn stream_of_vec() { let (mut tx, rx) = mpsc::channel(16); let sender = tokio::spawn(async move { tx.send("Hello ").await.unwrap(); tx.send("world").await.unwrap(); tx.send("\n").await.unwrap(); tx.send("How ").await.unwrap(); tx.send("are ").await.unwrap(); tx.send("you").await.unwrap(); }); let mut reader = StreamReader::new(rx); let mut output = String::new(); let mut read; while {read = reader.read_line(&mut output).await.expect("Failed to read"); read!=0} { println!("Read: {}", read); } println!("Done: {:?}", output); sender.await.expect("Child panic"); assert_eq!(&output[..], "Hello world\nHow are you"); } }