function SemaphoreDisposed ( message ) { this . message = message || "Semaphore was disposed" ; Object . freeze ( this ) ; }
function isPromise ( obj ) {
return ! ! obj && ( typeof obj === 'object' || typeof obj === 'function' ) && typeof obj . then === 'function' ;
}
/// An async semaphore with up to `cap` locks held at once.
///
/// To use as a mutex, make `cap` 1.
///
/// # Throws
/// If `cap` is 0 or lower.
function Semaphore ( cap )
{
/ * f u n c t i o n H a n d l e ( n e x t , i d )
{
this . state = next ;
this . id = id || new _id ( ) ;
}
Handle . create = function ( resolve , reject , id ) {
let handle = new Handle ( this , id ) ;
handle . state [ handle . id ] = ( error ) => error ? reject ( error ) : resolve ( ) ;
return handle ;
} ; * /
if ( ! cap || cap <= 0 ) throw ` Semaphore capacity cannot be ' ${ cap } ', must be a number greater than 0 ` ;
this . capacity = cap ;
this . length = 0 ;
let self = this ;
// Waiters for the seamphore
var next = {
waiters : [ ] ,
// wait for a slot to be available, then take it. If there are slots, the promise resolves immediately, if not, it is added to the release queue and will resolve when there is one available for it.
acquire : ( ) => {
return new Promise ( ( resolve , reject ) => { // next.waiters.push(error => error ? reject(error) : resolve()))
if ( self . length < self . capacity ) {
// Slot available, return now.
self . length += 1 ;
resolve ( ) ; //TODO: Add IDs? pass a Handle to resolve?
} else {
// Slot unavailable, add to queue.
next . waiters . push ( resolve ) ; //error => error ? reject(error) : resolve());
}
} ) ;
} ,
// release a raw lock on the semaphore. if there are pending `acquire()` promises, the length is not decremented, but the next queued up acquire is resolved insetad.
release : ( ) => {
if ( next . waiters . length > 0 ) // There are pending waiters
next . waiters . shift ( ) ( ) ; // No need to decrement, there is a pending waiter and that slot goes directly to them. (asyn)
else self . length -= 1 ; // Deremenet the length (imm)
}
//TODO: ,dispose: (error = new SemaphoreDisposed()) => { } // reject all `waiters`, and cause all new `acquire()`s to immediately reject.
} ;
this . acquire _raw = async ( ) => {
await next . acquire ( ) ;
} ;
this . release _raw = ( ) => {
next . release ( ) ;
} ;
/// Acquire a semaphore lock (if one is not available, wait until one is) and then run `func()` with the lock held, afterwards, release the lock and return the result of `func()`.
/// If `func` is an async function, it is awaited and the lock is released after it has resolved (or rejected), and the result of this function is the result of that promise.
this . using = async ( func ) => {
//if(!func || (typeof obj !== "function" && !isPromise(func))) throw `Parameter '{func}' is not a function or Promise.`;
await self . acquire _raw ( ) ;
try {
const rv = func ( ) ;
if ( isPromise ( rv ) ) return await rv ;
else return rv ;
} finally {
self . release _raw ( ) ;
}
} ;
/// Returns a lambda that, when executed, runs `using(func)` and returns the awaitable promise.
this . bind _using = ( func ) => { return ( ) => self . using ( func ) ; } ;
}
/// Create a mutex (single cap semaphore)
Semaphore . mutex = ( ) => new Semaphore ( 1 ) ;
const sem _example = async ( locks ) => {
let semaphore = new Semaphore ( locks || 1 ) ; //Semaphore.Mutex();
const gen _runner = ( name , num , time ) => {
if ( ! time || time < 0 ) return semaphore . bind _using ( ( ) => {
for ( let i = 0 ; i < num ; i ++ ) console . log ( ` ${ name } : ${ i } ` ) ;
return num ;
} ) ;
else return semaphore . bind _using ( async ( ) => {
for ( let i = 0 ; i < num ; i ++ ) {
await new Promise ( resolve => setTimeout ( resolve , time ) ) ;
console . log ( ` ${ name } : ${ i } ` ) ;
}
return num ;
} ) ;
} ;
return await Promise . all ( [
gen _runner ( "first" , 10 , 100 ) ( ) ,
gen _runner ( "second" , 15 , 75 ) ( ) ,
gen _runner ( "third" , 500 ) ( ) ,
] ) ;
} ;