You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
3.1 KiB
132 lines
3.1 KiB
use std::sync::{
|
|
mpsc,
|
|
Mutex,
|
|
Arc,
|
|
|
|
PoisonError,
|
|
};
|
|
use std::fmt;
|
|
use std::error;
|
|
use std::cell::UnsafeCell;
|
|
use std::{slice::SliceIndex, ops::RangeBounds};
|
|
|
|
#[derive(Debug)]
|
|
struct StateInner
|
|
{
|
|
map: UnsafeCell<super::map::MemoryMapMut>,
|
|
|
|
completion: Mutex<mpsc::Sender<(usize, usize)>>,
|
|
}
|
|
|
|
// SAFETY: The whole point of this is internal mutablility across thread boundaries.
|
|
unsafe impl Sync for StateInner{}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct State(Arc<StateInner>);
|
|
|
|
impl State
|
|
{
|
|
/// Create a new state from this map
|
|
#[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self
|
|
{
|
|
Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion: Mutex::new(completion)}))
|
|
}
|
|
|
|
/// Send a completion signal for the file of this index and this size.
|
|
pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>>
|
|
{
|
|
self.0.completion.lock().unwrap().send((idx, size))
|
|
}
|
|
|
|
/// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one.
|
|
#[inline] pub fn try_into_inner(self) -> Result<super::map::MemoryMapMut, Self>
|
|
{
|
|
match Arc::try_unwrap(self.0) {
|
|
Ok(v) => Ok(v.map.into_inner()),
|
|
Err(e) => Err(Self(e)),
|
|
}
|
|
}
|
|
|
|
/// Slice the map directly.
|
|
///
|
|
/// # Safety
|
|
/// The caller must make sure *no* slices of this map overlap with eachother.
|
|
// SAFETY: The map structure itself is never mutated, only its backing memory is accessed. This is fine, I think. If not, we can switch to using raw pointers and volatile writes. The backing memory itself is flushed to file when the map is dropped.
|
|
pub unsafe fn slice<R: RangeBounds<usize> + SliceIndex<[u8], Output = [u8]>>(&self, range: R) -> &mut [u8]
|
|
{
|
|
let slice = (*(self.0.map.get())).as_slice_mut();
|
|
&mut slice[range]
|
|
}
|
|
}
|
|
|
|
/// A multi-consumer message receiver
|
|
#[derive(Debug)]
|
|
pub struct PendingReceiver<T>
|
|
{
|
|
recv: Arc<Mutex<mpsc::Receiver<T>>>,
|
|
}
|
|
|
|
impl<T> Clone for PendingReceiver<T>
|
|
{
|
|
fn clone(&self) -> Self
|
|
{
|
|
Self{
|
|
recv: Arc::clone(&self.recv),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> PendingReceiver<T>
|
|
{
|
|
/// Try to receive a message.
|
|
pub fn recv(&self) -> Result<Option<T>, PendingReceiverError>
|
|
{
|
|
Ok(self.recv.lock()?.recv().ok())
|
|
}
|
|
}
|
|
|
|
/// Create an `mpmc` channel.
|
|
pub fn channel<T>(cap: usize) -> (mpsc::SyncSender<T>, PendingReceiver<T>)
|
|
{
|
|
let (tx, rx) = mpsc::sync_channel(cap);
|
|
|
|
(tx, PendingReceiver{
|
|
recv: Arc::new(Mutex::new(rx))
|
|
})
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum PendingReceiverError
|
|
{
|
|
Poisoned,
|
|
Closed,
|
|
}
|
|
|
|
impl error::Error for PendingReceiverError{}
|
|
impl fmt::Display for PendingReceiverError
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
|
|
{
|
|
match self {
|
|
Self::Poisoned => write!(f, "poisoned"),
|
|
Self::Closed => write!(f, "receiver closed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> From<PoisonError<T>> for PendingReceiverError
|
|
{
|
|
#[inline] fn from(_from: PoisonError<T>) -> Self
|
|
{
|
|
Self::Poisoned
|
|
}
|
|
}
|
|
|
|
impl From<mpsc::RecvError> for PendingReceiverError
|
|
{
|
|
fn from(_from: mpsc::RecvError) -> Self
|
|
{
|
|
Self::Closed
|
|
}
|
|
}
|