diff --git a/semaphore.js b/semaphore.js index 2094d23..ff640d3 100644 --- a/semaphore.js +++ b/semaphore.js @@ -37,29 +37,22 @@ function Semaphore(cap) waiters: [], // wait for a slot to be available, then take it. If there are slots, the promise resolves immediately, if not, it is added to the release queue and will resolve when there is one available for it. acquire: () => { - new Promise((resolve, reject) => { // next.waiters.push(error => error ? reject(error) : resolve())) + return new Promise((resolve, reject) => { // next.waiters.push(error => error ? reject(error) : resolve())) if(self.length < self.capacity) { // Slot available, return now. self.length += 1; - console.log(`imm acq: ${self.length} / ${self.capacity}: RESOLVING`); resolve(); //TODO: Add IDs? pass a Handle to resolve? } else { // Slot unavailable, add to queue. next.waiters.push(resolve);//error => error ? reject(error) : resolve()); - console.log(`asn acq: ${self.length} / ${self.capacity} (current queue: ${next.waiters})`); } }); }, // release a raw lock on the semaphore. if there are pending `acquire()` promises, the length is not decremented, but the next queued up acquire is resolved insetad. release: () => { - if(next.waiters.length>0) { // There are pending waiters - console.log(`rel trans: queue: ${next.waiters} RESOLVING`); - next.waiters.shift()(); // No need to decrement, there is a pending waiter and that slot goes directly to them. - } - else { - self.length -= 1; // Decremt the length - console.log(`rel imm: ${self.length} / ${self.capacity}`); - } + if(next.waiters.length>0) // There are pending waiters + next.waiters.shift()(); // No need to decrement, there is a pending waiter and that slot goes directly to them. (asyn) + else self.length -= 1; // Deremenet the length (imm) } //TODO: ,dispose: (error = new SemaphoreDisposed()) => { } // reject all `waiters`, and cause all new `acquire()`s to immediately reject. }; @@ -77,16 +70,13 @@ function Semaphore(cap) this.using = async (func) => { //if(!func || (typeof obj !== "function" && !isPromise(func))) throw `Parameter '{func}' is not a function or Promise.`; - console.log("waiting lock..."); - await next.acquire();//self.acquire_raw(); - console.log("running thunk..."); + await self.acquire_raw(); try { const rv = func(); if(isPromise(rv)) return await rv; else return rv; } finally { - console.log("releasing lock..."); - next.release();//self.release_raw(); + self.release_raw(); } }; } @@ -96,9 +86,9 @@ Semaphore.Mutex = () => new Semaphore(1); const sem_example = async () => { - var semaphore = Semaphore.Mutex(); //XXX: WHY doesn't this work? acquire just reutrns immediately, even when `resolve` is proven to NOT be called in the promise body??? - - await Promise.all([ + let semaphore = Semaphore.Mutex(); + + return await Promise.all([ semaphore.using(async () => { for(let i=0;i<10;i++) { await new Promise(resolve => setTimeout(resolve, 100));