Added basic (tx, rx) fd-overlap mapping with `MappedFile::try_new_buffer()`.

Fortune for mapped-file's current commit: Small blessing − 小吉
master
Avril 2 years ago
parent 4f427e88e4
commit ec26e0e9ed
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -121,37 +121,53 @@ impl<T: AsRawFd> MappedFile<T> {
/// Returns a dual mapping `(tx, rx)`, into the same file.
///
/// This essentially creates s "sender" `tx`, and "receiver" `rx` mapping over the same data.
/// The sender is *write only*, and the receiver is *read only*.
///
/// # Sharing modes
/// `B` is used for the counter over the file handle `T`. Currently it can be
/// * `buffer::Shared` - A `Send`able mapping, use this for concurrent processing.
/// * `buffer::Private` - A `!Send` mapping, use this for when both returned maps are only used on the same thread that this function was called from.
///
/// # Note
/// `len` **must** be a multiple of the used page size (or hugepage size, if `flags` is set to use one) for this to work.
pub fn try_new_buffer<B: buffer::TwoBufferProvider<T>>(file: T, len: usize, rings: impl Into<Option<std::num::NonZeroUsize>>, allow_unsafe_writes: bool, flags: impl flags::MapFlags) -> Result<(MappedFile<B>, MappedFile<B>), TryNewError<T>>
pub fn try_new_buffer<B: buffer::TwoBufferProvider<T>>(file: T, len: usize, flags: impl flags::MapFlags) -> Result<(MappedFile<B>, MappedFile<B>), TryNewError<T>>
{
Self::try_new_buffer_raw::<B>(file, len, None, false, flags)
}
#[inline]
pub(crate) fn try_new_buffer_raw<B: buffer::TwoBufferProvider<T>>(file: T, len: usize, rings: impl Into<Option<std::num::NonZeroUsize>>, allow_unsafe_writes: bool, flags: impl flags::MapFlags) -> Result<(MappedFile<B>, MappedFile<B>), TryNewError<T>>
{
const NULL: *mut libc::c_void = ptr::null_mut();
use std::{
//rc::Rc,
cell::RefCell,
};
let mut defer_set = {
let mut defer_set: Vec<(*mut u8, usize)> = Vec::new();
defer!(ref defer_set => |set: Vec<(*mut u8, usize)>| {
for (ptr, len) in set {
unsafe {
libc::munmap(ptr as *mut _, len);
}
}
});
defer_set
};
macro_rules! try_map {
macro_rules! try_map_or {
($($tt:tt)*) => {
match unsafe {
mmap($($tt)*)
} {
MAP_FAILED => Err(io::Error::last_os_error()),
NULL => _panic_invalid_address(),
ptr => Ok(unsafe {
UniqueSlice {
mem: NonNull::new_unchecked(ptr as *mut u8),
end: match NonNull::new((ptr as *mut u8).add(len)) {
Some(n) => n,
_ => _panic_invalid_address(),
}
}
})
}.map(MappedSlice)
};
}
macro_rules! try_map {
($($tt:tt)*) => {
MappedSlice(match unsafe {
mmap($($tt)*)
} {
MAP_FAILED => return Err(TryNewError::wrap_last_error(file)),
NULL => _panic_invalid_address(),
ptr => unsafe {
defer_set.push((ptr as *mut u8, len));
UniqueSlice {
mem: NonNull::new_unchecked(ptr as *mut u8),
@ -161,34 +177,75 @@ impl<T: AsRawFd> MappedFile<T> {
}
}
}
})
};
}
macro_rules! unwrap {
($err:expr) => {
match $err {
Ok(v) => v,
Err(e) => return Err(TryNewError::wrap((e, file))),
}
};
}
let (prot_r, prot_w) = if allow_unsafe_writes {
let (prot_w, prot_r) = if allow_unsafe_writes {
let p = Perm::ReadWrite.get_prot();
(p, p)
} else {
(Perm::Writeonly.get_prot(), Perm::Readonly.get_prot())
};
match rings.into() {
// Move into dual buffer
let (tx, rx) = match rings.into() {
None => {
// No rings, just create two mappings at same addr.
let flags = flags.get_mmap_flags();
let tm = try_map!(NULL, len, prot_w, flags, file.as_raw_fd(), 0);
let wm = try_map!(tm.mem.as_ptr() as *mut _, len, prot_r, flags | libc::MAP_FIXED, file.as_raw_fd(), 0);
//TODO... How to create the first `A?rc` over `file`?
let mut root = try_map!(NULL, len * 2, libc::PROT_NONE, (flags & !libc::MAP_SHARED) | libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, -1, 0);
let rawfd = file.as_raw_fd();
let rm = try_map!(root.0.as_mut_ptr().add(len) as *mut _, len, prot_r, flags | libc::MAP_FIXED, rawfd, 0); // Map reader at offset `len` from `root`.
let tm = try_map!(root.0.as_mut_ptr() as *mut _, len, prot_w, flags | libc::MAP_FIXED, rawfd, 0); // Map writer at `root`, unmapping the anonymous map used to reserve the pages.
let tf = B::from_value(file);
let rf = B::from_wrapper(tf.as_wrapper());
(MappedFile {
file: tf,
map: tm
}, MappedFile {
file: rf,
map: rm,
})
},
Some(pages) => {
// Create anon mapping
try_map!(NULL, len, libc::PROT_NONE, flags.get_mmap_flags() | libc::MAP_ANONYMOUS, file.as_raw_fd(), 0);
// Create anon mapping at full length
let full_len = unwrap!((len * 2)
.checked_mul(pages.get())
.ok_or_else(|| io::Error::new(io::ErrorKind::OutOfMemory,
format!("Could not map {} pages of size {len}. Value would overflow", pages.get()))));
let flags = flags.get_mmap_flags();
let mut root = try_map!(NULL, full_len, libc::PROT_NONE, libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, -1, 0);
let pivots = {
let rawfd = file.as_raw_fd();
let pivots: io::Result<Vec<_>> = std::iter::successors(unsafe { Some(root.0.as_mut_ptr().add(full_len)) }, |&x| unsafe { Some(x.sub(len * 2)) }) // Map in reverse, from end of `root`, and overwrite the `root` mapping last.
.take(pages.get())
.map(|base| {
let tm = try_map_or!(base as *mut _, len, prot_w, flags | libc::MAP_FIXED, rawfd, 0)?;
let rm = try_map_or!(base.add(len) as *mut _, len, prot_r, flags | libc::MAP_FIXED,rawfd, 0 )?;
Ok((tm, rm))
})
.collect();
unwrap!(pivots)
};
todo!("We can't carry `pivots` over to return from this function; the data is needed for unmapping the ring...");
todo!("The mapping we'd be using is `root`. But we need to unmap `pivots` in reverse order when the returned `MappedFile` is dropped...")
}
}
// Must happen at the end, so that maps dont get defer free'd
defer_set.clear();
}
};
Ok((tx, rx))
}
/// Map the file `file` to `len` bytes with memory protection as provided by `perm`, and
/// mapping flags provided by `flags`.
///
@ -449,6 +506,15 @@ impl<T> TryNewError<T>
value,
}
}
#[inline]
fn wrap((error, value): (impl Into<io::Error>, T)) -> Self
{
Self {
error: Box::new(error.into()),
value
}
}
/// Consume into the contained value
#[inline]
pub fn into_inner(self) -> T

@ -14,6 +14,13 @@ pub trait TwoBufferProvider<T: ?Sized>
type ControlWrapper: Borrow<T>;
fn as_wrapper(&self) -> &Self::ControlWrapper;
fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box<Self>;
#[inline(always)]
fn from_wrapper(r: &Self::ControlWrapper) -> Self
where Self: Sized {
*Self::from_wrapper_boxed(r)
}
#[inline(always)]
fn inner(&self) -> &T
@ -22,15 +29,22 @@ pub trait TwoBufferProvider<T: ?Sized>
}
fn from_boxed(value: Box<T>) -> Box<Self>;
//TODO: How do we give enough info to caller to create this?
#[inline(always)]
fn from_value(value: T) -> Self
where T: Sized,
Self: Sized
{
*Self::from_boxed(Box::new(value))
}
}
/// For thread-sharable buffer holds
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Shared<T: ?Sized>(sync::Arc<T>);
/// For non thread-sharable buffer holds
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Private<T: ?Sized>(rc::Rc<T>);
impl<T: ?Sized> TwoBufferProvider<T> for Shared<T> {
@ -43,7 +57,24 @@ impl<T: ?Sized> TwoBufferProvider<T> for Shared<T> {
#[inline]
fn from_boxed(value: Box<T>) ->Box<Self> {
Box::new(Self(value.into()))
Box::new(Self(From::from(value)))
}
#[inline(always)]
fn from_value(value: T) -> Self
where T: Sized,
Self: Sized {
Self(sync::Arc::new(value))
}
#[inline]
fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box<Self> {
Box::new(Self(r.clone()))
}
#[inline(always)]
fn from_wrapper(r: &Self::ControlWrapper) -> Self
where Self: Sized {
Self(r.clone())
}
}
@ -65,7 +96,24 @@ impl<T: ?Sized> TwoBufferProvider<T> for Private<T> {
#[inline]
fn from_boxed(value: Box<T>) ->Box<Self> {
Box::new(Self(value.into()))
Box::new(Self(From::from(value)))
}
#[inline(always)]
fn from_value(value: T) -> Self
where T: Sized,
Self: Sized {
Self(rc::Rc::new(value))
}
#[inline]
fn from_wrapper_boxed(r: &Self::ControlWrapper) -> Box<Self> {
Box::new(Self(r.clone()))
}
#[inline(always)]
fn from_wrapper(r: &Self::ControlWrapper) -> Self
where Self: Sized {
Self(r.clone())
}
}
@ -76,3 +124,50 @@ impl<T: ?Sized + AsRawFd> AsRawFd for Private<T>
self.as_wrapper().as_raw_fd()
}
}
impl<T: ?Sized> Shared<T>
{
/// Check if the connected mapping has not been dropped.
#[inline]
pub fn is_connected(&self) -> bool
{
sync::Arc::strong_count(&self.0) > 1
}
/// Consume into an `Arc` instance over the file handle.
#[inline]
pub fn into_arc(self) -> sync::Arc<T>
{
self.0
}
/// Get a reference of the file handle.
#[inline]
pub fn inner(&self) -> &T
{
&self.0
}
}
impl<T: ?Sized> Private<T>
{
/// Check if the connected mapping has not been dropped.
#[inline]
pub fn is_connected(&self) -> bool
{
rc::Rc::strong_count(&self.0) > 1
}
/// Consume into an `Rc` instance over the file handle.
#[inline]
pub fn into_rc(self) -> rc::Rc<T>
{
self.0
}
/// Get a reference of the file handle.
#[inline]
pub fn inner(&self) -> &T
{
&self.0
}
}

Loading…
Cancel
Save