From ec26e0e9ed82962facee135e4bb25ce2750a4ce0 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 17 Oct 2022 05:23:53 +0100 Subject: [PATCH] Added basic (tx, rx) fd-overlap mapping with `MappedFile::try_new_buffer()`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for mapped-file's current commit: Small blessing − 小吉 --- src/lib.rs | 132 +++++++++++++++++++++++++++++++++------------ src/ring/buffer.rs | 105 ++++++++++++++++++++++++++++++++++-- 2 files changed, 199 insertions(+), 38 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4971a2a..48c6a7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,37 +121,53 @@ impl MappedFile { /// Returns a dual mapping `(tx, rx)`, into the same file. /// + /// This essentially creates s "sender" `tx`, and "receiver" `rx` mapping over the same data. + /// The sender is *write only*, and the receiver is *read only*. + /// + /// # Sharing modes + /// `B` is used for the counter over the file handle `T`. Currently it can be + /// * `buffer::Shared` - A `Send`able mapping, use this for concurrent processing. + /// * `buffer::Private` - A `!Send` mapping, use this for when both returned maps are only used on the same thread that this function was called from. + /// /// # Note /// `len` **must** be a multiple of the used page size (or hugepage size, if `flags` is set to use one) for this to work. - pub fn try_new_buffer>(file: T, len: usize, rings: impl Into>, allow_unsafe_writes: bool, flags: impl flags::MapFlags) -> Result<(MappedFile, MappedFile), TryNewError> + pub fn try_new_buffer>(file: T, len: usize, flags: impl flags::MapFlags) -> Result<(MappedFile, MappedFile), TryNewError> + { + Self::try_new_buffer_raw::(file, len, None, false, flags) + } + #[inline] + pub(crate) fn try_new_buffer_raw>(file: T, len: usize, rings: impl Into>, allow_unsafe_writes: bool, flags: impl flags::MapFlags) -> Result<(MappedFile, MappedFile), TryNewError> { const NULL: *mut libc::c_void = ptr::null_mut(); - use std::{ - //rc::Rc, - cell::RefCell, - }; - let mut defer_set = { - let mut defer_set: Vec<(*mut u8, usize)> = Vec::new(); - defer!(ref defer_set => |set: Vec<(*mut u8, usize)>| { - for (ptr, len) in set { - unsafe { - libc::munmap(ptr as *mut _, len); - } - } - }); - defer_set - }; - - macro_rules! try_map { + macro_rules! try_map_or { ($($tt:tt)*) => { match unsafe { mmap($($tt)*) + } { + MAP_FAILED => Err(io::Error::last_os_error()), + NULL => _panic_invalid_address(), + ptr => Ok(unsafe { + + UniqueSlice { + mem: NonNull::new_unchecked(ptr as *mut u8), + end: match NonNull::new((ptr as *mut u8).add(len)) { + Some(n) => n, + _ => _panic_invalid_address(), + } + } + }) + }.map(MappedSlice) + }; + } + macro_rules! try_map { + ($($tt:tt)*) => { + MappedSlice(match unsafe { + mmap($($tt)*) } { MAP_FAILED => return Err(TryNewError::wrap_last_error(file)), NULL => _panic_invalid_address(), ptr => unsafe { - defer_set.push((ptr as *mut u8, len)); UniqueSlice { mem: NonNull::new_unchecked(ptr as *mut u8), @@ -161,34 +177,75 @@ impl MappedFile { } } } + }) + }; + } + + macro_rules! unwrap { + ($err:expr) => { + match $err { + Ok(v) => v, + Err(e) => return Err(TryNewError::wrap((e, file))), } }; } - let (prot_r, prot_w) = if allow_unsafe_writes { + + let (prot_w, prot_r) = if allow_unsafe_writes { let p = Perm::ReadWrite.get_prot(); (p, p) } else { (Perm::Writeonly.get_prot(), Perm::Readonly.get_prot()) }; - match rings.into() { + // Move into dual buffer + + let (tx, rx) = match rings.into() { None => { // No rings, just create two mappings at same addr. let flags = flags.get_mmap_flags(); - let tm = try_map!(NULL, len, prot_w, flags, file.as_raw_fd(), 0); - let wm = try_map!(tm.mem.as_ptr() as *mut _, len, prot_r, flags | libc::MAP_FIXED, file.as_raw_fd(), 0); - //TODO... How to create the first `A?rc` over `file`? + let mut root = try_map!(NULL, len * 2, libc::PROT_NONE, (flags & !libc::MAP_SHARED) | libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, -1, 0); + let rawfd = file.as_raw_fd(); + + let rm = try_map!(root.0.as_mut_ptr().add(len) as *mut _, len, prot_r, flags | libc::MAP_FIXED, rawfd, 0); // Map reader at offset `len` from `root`. + let tm = try_map!(root.0.as_mut_ptr() as *mut _, len, prot_w, flags | libc::MAP_FIXED, rawfd, 0); // Map writer at `root`, unmapping the anonymous map used to reserve the pages. + + let tf = B::from_value(file); + let rf = B::from_wrapper(tf.as_wrapper()); + (MappedFile { + file: tf, + map: tm + }, MappedFile { + file: rf, + map: rm, + }) }, Some(pages) => { - // Create anon mapping - try_map!(NULL, len, libc::PROT_NONE, flags.get_mmap_flags() | libc::MAP_ANONYMOUS, file.as_raw_fd(), 0); + // Create anon mapping at full length + let full_len = unwrap!((len * 2) + .checked_mul(pages.get()) + .ok_or_else(|| io::Error::new(io::ErrorKind::OutOfMemory, + format!("Could not map {} pages of size {len}. Value would overflow", pages.get())))); + let flags = flags.get_mmap_flags(); + let mut root = try_map!(NULL, full_len, libc::PROT_NONE, libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, -1, 0); + let pivots = { + let rawfd = file.as_raw_fd(); + let pivots: io::Result> = std::iter::successors(unsafe { Some(root.0.as_mut_ptr().add(full_len)) }, |&x| unsafe { Some(x.sub(len * 2)) }) // Map in reverse, from end of `root`, and overwrite the `root` mapping last. + .take(pages.get()) + .map(|base| { + let tm = try_map_or!(base as *mut _, len, prot_w, flags | libc::MAP_FIXED, rawfd, 0)?; + let rm = try_map_or!(base.add(len) as *mut _, len, prot_r, flags | libc::MAP_FIXED,rawfd, 0 )?; + + Ok((tm, rm)) + }) + .collect(); + unwrap!(pivots) + }; + + todo!("We can't carry `pivots` over to return from this function; the data is needed for unmapping the ring..."); + todo!("The mapping we'd be using is `root`. But we need to unmap `pivots` in reverse order when the returned `MappedFile` is dropped...") } - - } - - // Must happen at the end, so that maps dont get defer free'd - defer_set.clear(); - } - + }; + Ok((tx, rx)) + } /// Map the file `file` to `len` bytes with memory protection as provided by `perm`, and /// mapping flags provided by `flags`. /// @@ -449,6 +506,15 @@ impl TryNewError value, } } + + #[inline] + fn wrap((error, value): (impl Into, T)) -> Self + { + Self { + error: Box::new(error.into()), + value + } + } /// Consume into the contained value #[inline] pub fn into_inner(self) -> T diff --git a/src/ring/buffer.rs b/src/ring/buffer.rs index 15ff6b1..4cb53f5 100644 --- a/src/ring/buffer.rs +++ b/src/ring/buffer.rs @@ -14,6 +14,13 @@ pub trait TwoBufferProvider type ControlWrapper: Borrow; fn as_wrapper(&self) -> &Self::ControlWrapper; + fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box; + + #[inline(always)] + fn from_wrapper(r: &Self::ControlWrapper) -> Self + where Self: Sized { + *Self::from_wrapper_boxed(r) + } #[inline(always)] fn inner(&self) -> &T @@ -22,15 +29,22 @@ pub trait TwoBufferProvider } fn from_boxed(value: Box) -> Box; -//TODO: How do we give enough info to caller to create this? + + #[inline(always)] + fn from_value(value: T) -> Self + where T: Sized, + Self: Sized + { + *Self::from_boxed(Box::new(value)) + } } /// For thread-sharable buffer holds -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Shared(sync::Arc); /// For non thread-sharable buffer holds -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Private(rc::Rc); impl TwoBufferProvider for Shared { @@ -43,7 +57,24 @@ impl TwoBufferProvider for Shared { #[inline] fn from_boxed(value: Box) ->Box { - Box::new(Self(value.into())) + Box::new(Self(From::from(value))) + } + + #[inline(always)] + fn from_value(value: T) -> Self + where T: Sized, + Self: Sized { + Self(sync::Arc::new(value)) + } + + #[inline] + fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box { + Box::new(Self(r.clone())) + } + #[inline(always)] + fn from_wrapper(r: &Self::ControlWrapper) -> Self + where Self: Sized { + Self(r.clone()) } } @@ -65,7 +96,24 @@ impl TwoBufferProvider for Private { #[inline] fn from_boxed(value: Box) ->Box { - Box::new(Self(value.into())) + Box::new(Self(From::from(value))) + } + + #[inline(always)] + fn from_value(value: T) -> Self + where T: Sized, + Self: Sized { + Self(rc::Rc::new(value)) + } + + #[inline] + fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box { + Box::new(Self(r.clone())) + } + #[inline(always)] + fn from_wrapper(r: &Self::ControlWrapper) -> Self + where Self: Sized { + Self(r.clone()) } } @@ -76,3 +124,50 @@ impl AsRawFd for Private self.as_wrapper().as_raw_fd() } } + +impl Shared +{ + /// Check if the connected mapping has not been dropped. + #[inline] + pub fn is_connected(&self) -> bool + { + sync::Arc::strong_count(&self.0) > 1 + } + + /// Consume into an `Arc` instance over the file handle. + #[inline] + pub fn into_arc(self) -> sync::Arc + { + self.0 + } + + /// Get a reference of the file handle. + #[inline] + pub fn inner(&self) -> &T + { + &self.0 + } +} +impl Private +{ + /// Check if the connected mapping has not been dropped. + #[inline] + pub fn is_connected(&self) -> bool + { + rc::Rc::strong_count(&self.0) > 1 + } + + /// Consume into an `Rc` instance over the file handle. + #[inline] + pub fn into_rc(self) -> rc::Rc + { + self.0 + } + + /// Get a reference of the file handle. + #[inline] + pub fn inner(&self) -> &T + { + &self.0 + } +}