Buffered: Fix buffer not being cleared on flush. Added internal info messages when `cfg!(test)` is enabled.

Fortune for rsh's current commit: Middle blessing − 中吉
sock-buffering
Avril 4 years ago
parent 818659b83c
commit a6a25259b8
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -34,6 +34,6 @@ mod sock;
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> async fn main() -> eyre::Result<()>
{ {
Ok(()) Ok(())
} }

@ -18,9 +18,11 @@ use bytes::Bytes;
use cancel::*; use cancel::*;
mod buffered; pub mod buffered;
pub mod enc; pub mod enc;
pub use buffered::Buffered;
/// Details of a newly accepted raw socket peer. /// Details of a newly accepted raw socket peer.
/// ///
/// This connected will have been "accepted", but not yet trusted /// This connected will have been "accepted", but not yet trusted

@ -87,6 +87,9 @@ where T: AsyncWrite
for chunk in this.buffer.chunks_exact(SIZE) for chunk in this.buffer.chunks_exact(SIZE)
{ {
if cfg!(test) {
eprintln!("Pushing chunk: {:?}", chunk);
}
match this.stream.write_all(&chunk).await { match this.stream.write_all(&chunk).await {
Ok(()) => { Ok(()) => {
written += chunk.len(); written += chunk.len();
@ -114,7 +117,12 @@ where T: AsyncWrite
use tokio::prelude::*; use tokio::prelude::*;
let wres = if this.buffer.len() > 0 { let wres = if this.buffer.len() > 0 {
this.stream.write_all(&this.buffer[..]).await if cfg!(test) {
eprintln!("Pushing rest: {:?}", &this.buffer[..]);
}
let res = this.stream.write_all(&this.buffer[..]).await;
this.buffer.clear();
res
} else { } else {
Ok(()) Ok(())
}; };
@ -157,7 +165,7 @@ mod tests
use tokio::prelude::*; use tokio::prelude::*;
let (tx, mut rx) = tokio::io::duplex(11); let (tx, mut rx) = tokio::io::duplex(11);
let mut ttx = Buffered::<_, 16>::new(tx); let mut ttx = Buffered::<_, 4>::new(tx);
let back = tokio::spawn(async move { let back = tokio::spawn(async move {
@ -167,6 +175,7 @@ mod tests
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await; tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("Flushing stream"); println!("Flushing stream");
ttx.flush().await?; ttx.flush().await?;
ttx.shutdown().await?;
Result::<_, std::io::Error>::Ok(()) Result::<_, std::io::Error>::Ok(())
}); });

Loading…
Cancel
Save