You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
3.1 KiB

use std::sync::{
mpsc,
Mutex,
Arc,
PoisonError,
};
use std::fmt;
use std::error;
use std::cell::UnsafeCell;
use std::{slice::SliceIndex, ops::RangeBounds};
#[derive(Debug)]
struct StateInner
{
map: UnsafeCell<super::map::MemoryMapMut>,
completion: Mutex<mpsc::Sender<(usize, usize)>>,
}
// SAFETY: The whole point of this is internal mutablility across thread boundaries.
unsafe impl Sync for StateInner{}
#[derive(Debug, Clone)]
pub struct State(Arc<StateInner>);
impl State
{
/// Create a new state from this map
#[inline] pub fn new(map: super::map::MemoryMapMut, completion: mpsc::Sender<(usize, usize)>) -> Self
{
Self(Arc::new(StateInner{map: UnsafeCell::new(map), completion: Mutex::new(completion)}))
}
/// Send a completion signal for the file of this index and this size.
pub fn send_complete(&self, idx: usize, size: usize) -> Result<(), mpsc::SendError<(usize, usize)>>
{
self.0.completion.lock().unwrap().send((idx, size))
}
/// Try to consume this instance into its map. This will only succeed if there are no more references to the state than this one.
#[inline] pub fn try_into_inner(self) -> Result<super::map::MemoryMapMut, Self>
{
match Arc::try_unwrap(self.0) {
Ok(v) => Ok(v.map.into_inner()),
Err(e) => Err(Self(e)),
}
}
/// Slice the map directly.
///
/// # Safety
/// The caller must make sure *no* slices of this map overlap with eachother.
// SAFETY: The map structure itself is never mutated, only its backing memory is accessed. This is fine, I think. If not, we can switch to using raw pointers and volatile writes. The backing memory itself is flushed to file when the map is dropped.
pub unsafe fn slice<R: RangeBounds<usize> + SliceIndex<[u8], Output = [u8]>>(&self, range: R) -> &mut [u8]
{
let slice = (*(self.0.map.get())).as_slice_mut();
&mut slice[range]
}
}
/// A multi-consumer message receiver
#[derive(Debug)]
pub struct PendingReceiver<T>
{
recv: Arc<Mutex<mpsc::Receiver<T>>>,
}
impl<T> Clone for PendingReceiver<T>
{
fn clone(&self) -> Self
{
Self{
recv: Arc::clone(&self.recv),
}
}
}
impl<T> PendingReceiver<T>
{
/// Try to receive a message.
pub fn recv(&self) -> Result<Option<T>, PendingReceiverError>
{
Ok(self.recv.lock()?.recv().ok())
}
}
/// Create an `mpmc` channel.
pub fn channel<T>(cap: usize) -> (mpsc::SyncSender<T>, PendingReceiver<T>)
{
let (tx, rx) = mpsc::sync_channel(cap);
(tx, PendingReceiver{
recv: Arc::new(Mutex::new(rx))
})
}
#[derive(Debug)]
pub enum PendingReceiverError
{
Poisoned,
Closed,
}
impl error::Error for PendingReceiverError{}
impl fmt::Display for PendingReceiverError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self {
Self::Poisoned => write!(f, "poisoned"),
Self::Closed => write!(f, "receiver closed"),
}
}
}
impl<T> From<PoisonError<T>> for PendingReceiverError
{
#[inline] fn from(_from: PoisonError<T>) -> Self
{
Self::Poisoned
}
}
impl From<mpsc::RecvError> for PendingReceiverError
{
fn from(_from: mpsc::RecvError) -> Self
{
Self::Closed
}
}