You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rsh/src/sock/pipe.rs

128 lines
3.7 KiB

//! Piping buffered data from a raw socket to `ESock`
//!
//! This exists because i'm too dumb to implement a functional AsyncRead/Write buffered wrapper stream :/
use super::*;
use std::{
io,
marker::{
Send, Sync,
Unpin,
PhantomData,
},
};
use futures::{
Future,
};
use tokio::sync::{
mpsc,
};
use enc::{
ESock,
ESockReadHalf,
ESockWriteHalf,
};
/// The default buffer size for `BufferedESock`.
pub const DEFAULT_BUFFER_SIZE: usize = 32;
/// Task-based buffered piping to/from encrypted sockets.
#[derive(Debug)]
pub struct BufferedESock<W, R>
{
bufsz: usize,
_backing: PhantomData<ESock<W, R>>,
}
/// Info for bsock tasks
#[derive(Debug, Clone, PartialEq, Eq)]
struct BSockTaskInfo
{
bufsz: usize,
}
/// `tx`: ESock-wrapped network output socket.
/// `rx`: Reading half of the user's fake stream
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be written to, so I think making it opaque with a generic is fine.
fn bsock_writer<'a, Raw, Fake>(info: BSockTaskInfo, mut tx: ESockWriteHalf<Raw>, mut rx: Fake) -> impl Future<Output = ()> + 'a
where Raw: AsyncWrite + Unpin + 'a,
Fake: AsyncRead + Unpin + 'a,
{
async move {
}
}
/// `rx`: Raw, unencrypted network input socket
/// `tx`: Writing half of the user's `ESockReadHalf`'s rx stream, which wraps the receiver from this fake sender stream.
//TODO: Maybe we can get away with using `DuplexStream` instead of a generic for `Fake`. But we don't want to give any illusions that that stream can be read from, so I think making it opaque with a generic is fine.
fn bsock_reader<'a, Raw, Fake>(info: BSockTaskInfo, mut rx: Raw, mut tx: Fake) -> impl Future<Output= ()> + 'a
where Raw: AsyncRead + Unpin + 'a,
Fake: AsyncWrite + Unpin + 'a
{
use tokio::prelude::*;
async move {
let mut last_read_error = None;
let mut buffer = vec![0u8; info.bufsz];
loop {
// Read into buffer.
let mut read;
let mut done=0;
while {
match rx.read(&mut buffer[done..]).await {
Ok(n) => {read = n; n > 0},
Err(err) => {read = 0; last_read_error = Some(err); false},
}
} {
done += read;
if done == info.bufsz {
// Buffer is full.
break;
}
}
// Write the buffer
if done > 0 {
if let Err(write_err) = tx.write_all(&buffer[..done]).await {
//We don't want to propagate this error. If writing to the fake socket fails, then it has been dropped or closed some other way, and that is fine. Just report then exit the task.
eprintln!("bsock_reader: Failed to write to fake stream: {}", write_err);
break;
}
}
// Check if there was an error mid-way through reading the buffer
// XXX: Should we instead propagate read errors immediately, ignoring the partially-filled buffer? I think not, as we may get a valid partial buffer which the remote socket then disconnects (exits process) from afterwards.
if let Some(read_err) = last_read_error.take()
{
//TODO: How to propagate errors to `tx`?
eprintln!("bsock_reader: Failed to read from real stream: {}", read_err);
let _ = tx.shutdown().await;
break;
}
}
}
}
impl<W, R> BufferedESock<W, R>
where W: AsyncWrite + Unpin + Send + 'static,
R: AsyncRead + Unpin + Send + 'static
{
/// Create a new buffered ESock pipe with a specific buffer size
pub fn with_size(tx: W, rx: R, bufsz: usize) -> Self
{
Self {
bufsz,
_backing: PhantomData,
}
}
/// Create a new buffered ESock pipe with the default buffer size (`DEFAULT_BUFFER_SIZE`).
#[inline] pub fn new(tx: W, rx: R) -> Self
{
Self::with_size(tx, rx, DEFAULT_BUFFER_SIZE)
}
}