WHY DOESN"T THIS FUKCINGA I STUPID THING WORK????? RESOLVE IS ***PROVEN*** TO ____NOT____ BE CALLED IN THE PROMISE CTOR SO WHY THE FUCK IS THAT PROMISE RESOLVING IMMEDIATELY!??!?!?!??!???!??!??!?!?!?!?

Fortune for async-timeout-js's current commit: Future blessing − 末吉
master
Avril 3 years ago
parent bc64ae9232
commit d151412576
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1,15 +1,19 @@
function SemaphoreDisposed(message) { this.message = message || "Semaphore was disposed"; Object.freeze(this); }
const new_id = () => {
};
apsdajod XXX: eh, this isn't working...
function isPromise(obj) {
return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function';
}
/// An async semaphore with up to `cap` locks held at once.
///
/// To use as a mutex, make `cap` 1.
///
/// # Throws
/// If `cap` is 0 or lower.
function Semaphore(cap)
{
function Handle(next, id)
/*function Handle(next, id)
{
this.state = next;
this.id = id || new_id();
@ -19,35 +23,99 @@ function Semaphore(cap)
let handle = new Handle(this, id);
handle.state[handle.id] = (error) => error ? reject(error) : resolve();
return handle;
};
};*/
if(!cap || cap <= 0) throw `Semaphore capacity cannot be '${cap}', must be a number greater than 0`;
this.capacity = cap;
this.length = 0;
let self = this;
// Waiters for the seamphore
var next = {
waiters: {},
// Push a waiter into the queue and return a promise for when it completes
wait: () => {
waiters: [],
// wait for a slot to be available, then take it. If there are slots, the promise resolves immediately, if not, it is added to the release queue and will resolve when there is one available for it.
acquire: () => {
new Promise((resolve, reject) => { // next.waiters.push(error => error ? reject(error) : resolve()))
return Handle.create(resolve, reject).bind(next);
if(self.length < self.capacity) {
// Slot available, return now.
self.length += 1;
console.log(`imm acq: ${self.length} / ${self.capacity}: RESOLVING`);
resolve(); //TODO: Add IDs? pass a Handle to resolve?
} else {
// Slot unavailable, add to queue.
next.waiters.push(resolve);//error => error ? reject(error) : resolve());
console.log(`asn acq: ${self.length} / ${self.capacity} (current queue: ${next.waiters})`);
}
});
},
// Pop a waiter from the queue and complete its promise (with an optional truthy error)
add: (error) => { if(next.waiters.length>0) next.waiters.shift()(error); },
// Complete all semaphores, with an optional truthy error for rejection
flush: (reject) => {
while(next.waiters.length>0) next.waiters.shift()(reject);
},
// Dispose all semaphores in the waiter
dispose: (message) => next.flush(new SemaphoreDisposed(message))
// release a raw lock on the semaphore. if there are pending `acquire()` promises, the length is not decremented, but the next queued up acquire is resolved insetad.
release: () => {
if(next.waiters.length>0) { // There are pending waiters
console.log(`rel trans: queue: ${next.waiters} RESOLVING`);
next.waiters.shift()(); // No need to decrement, there is a pending waiter and that slot goes directly to them.
}
else {
self.length -= 1; // Decremt the length
console.log(`rel imm: ${self.length} / ${self.capacity}`);
}
}
//TODO: ,dispose: (error = new SemaphoreDisposed()) => { } // reject all `waiters`, and cause all new `acquire()`s to immediately reject.
};
this.capacity = cap;
this.length = 0;
this.acquire_raw = async () => {
await next.acquire();
};
this.acquire = async () => {
this.release_raw = () => {
next.release();
};
this.release = () => {
/// Acquire a semaphore lock (if one is not available, wait until one is) and then run `func()` with the lock held, afterwards, release the lock and return the result of `func()`.
/// If `func` is an async function, it is awaited and the lock is released after it has resolved (or rejected), and the result of this function is the result of that promise.
this.using = async (func) => {
//if(!func || (typeof obj !== "function" && !isPromise(func))) throw `Parameter '{func}' is not a function or Promise.`;
console.log("waiting lock...");
await next.acquire();//self.acquire_raw();
console.log("running thunk...");
try {
const rv = func();
if(isPromise(rv)) return await rv;
else return rv;
} finally {
console.log("releasing lock...");
next.release();//self.release_raw();
}
};
}
/// Create a Mutex
Semaphore.Mutex = () => new Semaphore(1);
const sem_example = async () => {
var semaphore = Semaphore.Mutex(); //XXX: WHY doesn't this work? acquire just reutrns immediately, even when `resolve` is proven to NOT be called in the promise body???
await Promise.all([
semaphore.using(async () => {
for(let i=0;i<10;i++) {
await new Promise(resolve => setTimeout(resolve, 100));
console.log(`first: ${i}`);
}
return 10;
}),
semaphore.using(async () => {
for(let i=0;i<15;i++) {
await new Promise(resolve => setTimeout(resolve, 75));
console.log(`second: ${i}`);
}
return 15;
}),
semaphore.using(() => {
for(let i=0;i<500;i++) console.log(`third ${i}`);
return 500;
}),
]);
};

Loading…
Cancel
Save