From f9f85f36e118ff66ef96179437802ec19860408b Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 11 Feb 2022 21:07:48 +0000 Subject: [PATCH 1/7] Failed attempt at allowing async functions to be passed as `thing`: This will need a complete redesign to support this and I don"t care enough right now to do it. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Middle blessing − 中吉 --- asynctimeout.js | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/asynctimeout.js b/asynctimeout.js index bfd5df1..401e42b 100644 --- a/asynctimeout.js +++ b/asynctimeout.js @@ -15,10 +15,21 @@ /// See `examples` below. function AsyncTimeout(thing, interval) { + function isPromise(obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; + } + + async function _call(t) + { + if(isPromise(t)) return await t(arguments); + else await (async () => {}); + return t(arguments); + } + this.timeout = () => new Promise((resolve, reject) => { setTimeout(() => { try { - resolve(thing()); + _call(thing).then(resolve); //XXX: This design doesn't work, we'll need to go back to the drawing board with this function for this to work... The exception won't propagate here, so... } catch(e) { reject(e); } }, interval); }); @@ -26,7 +37,7 @@ function AsyncTimeout(thing, interval) while(true) { yield await new Promise((resolve, reject) => { setTimeout(() => { - try { resolve(thing()); } catch(e) { reject(e); } + try { _call(thing).then(resolve); } catch(e) { reject(e); } }, interval); }); } From e01d0d853470c150e35427c2b857117db18923f5 Mon Sep 17 00:00:00 2001 From: Avril Date: Fri, 11 Feb 2022 21:21:16 +0000 Subject: [PATCH 2/7] Update README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Blessing − 吉 --- README.md | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 759fe18..31eb3d7 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,27 @@ -# AsyncTimeout - A promisified wrapper for `setTimeout()` and `setInterval()` +# AsyncTimeout - `setTimeout()` and `setInterval()` for async code. +The class `AsyncTimeout(function, timeout)`. +An async wrapper around `setTimeout()`, and an async iterator wrapper around `setInterval()`, which resolves to/yields the return value of the supplied function for both `.timeout()` and `.interval()` respectively. + +## Methods + +There are two methods on the class `AsyncTimeout(func, time)`: +* `timeout()`: Creates an `await`able promise that will resolve to the value of `func()` when the timeout `time` is reached. +* `interval()`: Creates a `for await`able async iterator that will resolve to the value of `func()` each `time` it is called. To cancel the interval, simply break from the loop or stop calling `.next()` on the returned async iterator. + +## Parameters +* `thing` - The function to call after the timeout/interval. The result of the promise (or the yield for `interval()`) is the result of this function call. If the funtion throws, then the rejection of the promise will be that error thrown. +* `interval` - The time to wait for the timeout or interval + +## Example usages +See `example()` in the file for detail. + +### `setTimeout()` promisified +`await (new AsyncTimeout(() => "value", 100)).timeout() /* === "value" */` is `setTimeout(()=> "value", 100)` promisified. + +### `setInterval()` async iterator-ifyed. +`for await (const value of new AsyncTimeout(() => "value", 100)).interval()) { /* value === "value" */ }` is `setInterval(()=> "value", 100)` with each interval promisified. +`value` will be the result of each interval call, so the function passed can capture, interact with, and return whatever it wants during the interval and each `value` will be the result of it being called again at each interval (as is expected.) + +This is an infinite iterator. To cancel the interval, simply break out of the `for await` loop or otherwise stop using the iterator returned from `.interval()`. From e58db0b0b99d038094c8087c2457c30dcf2c3b6c Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 12 Feb 2022 15:53:32 +0000 Subject: [PATCH 3/7] Fix syntax error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Middle blessing − 中吉 --- asynctimeout.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asynctimeout.js b/asynctimeout.js index 401e42b..ced4eea 100644 --- a/asynctimeout.js +++ b/asynctimeout.js @@ -33,7 +33,7 @@ function AsyncTimeout(thing, interval) } catch(e) { reject(e); } }, interval); }); - this.interval = async function*() => { + this.interval = async function*() { while(true) { yield await new Promise((resolve, reject) => { setTimeout(() => { From 4f9c5417157e6626c17310699ae5dc13248bfbf2 Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 12 Feb 2022 15:56:13 +0000 Subject: [PATCH 4/7] Changed `exmaple()` to `console.log()` instead of `alert()`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Blessing − 吉 --- asynctimeout.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asynctimeout.js b/asynctimeout.js index ced4eea..dab865c 100644 --- a/asynctimeout.js +++ b/asynctimeout.js @@ -48,10 +48,10 @@ const example = async (_timeout) => { _timeout = _timeout || 100; // Wait 100 then alert "hi" - alert(await (new AsyncTimeout(() => "hi", _timeout)).timeout()); + console.log(await (new AsyncTimeout(() => "hi", _timeout)).timeout()); // Continuously wait 100 then alert "hi" - for await (const value of new AsyncTimeout(() => "hi forever", _timeout).interval()) { alert(value); } + for await (const value of new AsyncTimeout(() => "hi forever", _timeout).interval()) { console.log(value); } }; From caad3ce5b657d71ffc28ae37d255f77c49970feb Mon Sep 17 00:00:00 2001 From: Avril Date: Sat, 12 Feb 2022 21:08:06 +0000 Subject: [PATCH 5/7] Merged `Semaphore` from master MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Small curse − 小凶 --- semaphore.js | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 semaphore.js diff --git a/semaphore.js b/semaphore.js new file mode 100644 index 0000000..ff640d3 --- /dev/null +++ b/semaphore.js @@ -0,0 +1,111 @@ + +function SemaphoreDisposed(message) { this.message = message || "Semaphore was disposed"; Object.freeze(this); } + +function isPromise(obj) { + return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; +} + +/// An async semaphore with up to `cap` locks held at once. +/// +/// To use as a mutex, make `cap` 1. +/// +/// # Throws +/// If `cap` is 0 or lower. +function Semaphore(cap) +{ + /*function Handle(next, id) + { + this.state = next; + this.id = id || new_id(); + } + + Handle.create = function(resolve, reject, id) { + let handle = new Handle(this, id); + handle.state[handle.id] = (error) => error ? reject(error) : resolve(); + return handle; + };*/ + + if(!cap || cap <= 0) throw `Semaphore capacity cannot be '${cap}', must be a number greater than 0`; + + this.capacity = cap; + this.length = 0; + + let self = this; + + // Waiters for the seamphore + var next = { + waiters: [], + // wait for a slot to be available, then take it. If there are slots, the promise resolves immediately, if not, it is added to the release queue and will resolve when there is one available for it. + acquire: () => { + return new Promise((resolve, reject) => { // next.waiters.push(error => error ? reject(error) : resolve())) + if(self.length < self.capacity) { + // Slot available, return now. + self.length += 1; + resolve(); //TODO: Add IDs? pass a Handle to resolve? + } else { + // Slot unavailable, add to queue. + next.waiters.push(resolve);//error => error ? reject(error) : resolve()); + } + }); + }, + // release a raw lock on the semaphore. if there are pending `acquire()` promises, the length is not decremented, but the next queued up acquire is resolved insetad. + release: () => { + if(next.waiters.length>0) // There are pending waiters + next.waiters.shift()(); // No need to decrement, there is a pending waiter and that slot goes directly to them. (asyn) + else self.length -= 1; // Deremenet the length (imm) + } + //TODO: ,dispose: (error = new SemaphoreDisposed()) => { } // reject all `waiters`, and cause all new `acquire()`s to immediately reject. + }; + + this.acquire_raw = async () => { + await next.acquire(); + }; + + this.release_raw = () => { + next.release(); + }; + + /// Acquire a semaphore lock (if one is not available, wait until one is) and then run `func()` with the lock held, afterwards, release the lock and return the result of `func()`. + /// If `func` is an async function, it is awaited and the lock is released after it has resolved (or rejected), and the result of this function is the result of that promise. + this.using = async (func) => { + //if(!func || (typeof obj !== "function" && !isPromise(func))) throw `Parameter '{func}' is not a function or Promise.`; + + await self.acquire_raw(); + try { + const rv = func(); + if(isPromise(rv)) return await rv; + else return rv; + } finally { + self.release_raw(); + } + }; +} + +/// Create a Mutex +Semaphore.Mutex = () => new Semaphore(1); + + +const sem_example = async () => { + let semaphore = Semaphore.Mutex(); + + return await Promise.all([ + semaphore.using(async () => { + for(let i=0;i<10;i++) { + await new Promise(resolve => setTimeout(resolve, 100)); + console.log(`first: ${i}`); + } + return 10; + }), + semaphore.using(async () => { + for(let i=0;i<15;i++) { + await new Promise(resolve => setTimeout(resolve, 75)); + console.log(`second: ${i}`); + } + return 15; + }), + semaphore.using(() => { + for(let i=0;i<500;i++) console.log(`third ${i}`); + return 500; + }), + ]); +}; From 0d9f2adb045408c658593ca3e1ff2fb50eba36b0 Mon Sep 17 00:00:00 2001 From: Avril Date: Sun, 13 Feb 2022 17:32:12 +0000 Subject: [PATCH 6/7] Semaphore: Added `try_acquire()`, `try_using()` (see docs). Added `bind_using(func)` to returned a bound function of `func` to `this.using`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fortune for async-timeout-js's current commit: Middle blessing − 中吉 --- semaphore.js | 124 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 97 insertions(+), 27 deletions(-) diff --git a/semaphore.js b/semaphore.js index ff640d3..4be8bc8 100644 --- a/semaphore.js +++ b/semaphore.js @@ -1,9 +1,8 @@ function SemaphoreDisposed(message) { this.message = message || "Semaphore was disposed"; Object.freeze(this); } -function isPromise(obj) { - return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; -} +// despite any similarity to the well-known and famous `is-promise` npm library, this is all my own code typed up completely so fuck your licenses. +const is_promise = obj => !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; /// An async semaphore with up to `cap` locks held at once. /// @@ -61,6 +60,16 @@ function Semaphore(cap) await next.acquire(); }; + /// Attempt to acquire the semaphore lock if there is one available. + /// + /// # Returns + /// `true` if the lock was acquired, `false` if the lock must be waited on to be acquired. + this.try_acquire_raw = () => { + if(self.length < self.capacity) { + self.length+=1; return true; + } return false; + }; + this.release_raw = () => { next.release(); }; @@ -68,44 +77,105 @@ function Semaphore(cap) /// Acquire a semaphore lock (if one is not available, wait until one is) and then run `func()` with the lock held, afterwards, release the lock and return the result of `func()`. /// If `func` is an async function, it is awaited and the lock is released after it has resolved (or rejected), and the result of this function is the result of that promise. this.using = async (func) => { - //if(!func || (typeof obj !== "function" && !isPromise(func))) throw `Parameter '{func}' is not a function or Promise.`; + //if(!func || (typeof obj !== "function" && !is_promise(func))) throw `Parameter '{func}' is not a function or Promise.`; await self.acquire_raw(); try { const rv = func(); - if(isPromise(rv)) return await rv; + if(is_promise(rv)) return await rv; else return rv; } finally { self.release_raw(); } }; + + /// Attempt to acquire a semaphore lock and execute `func()` with that lock held. If there is no available lock, immediately return and do not attempt to run `func()`. + /// + /// # Async `func()` + /// If `func` is an async function, the return value of this function will be an awaitable promise that will release the lock upon awaiting (resolved or rejected), and yield the value of the awaited promise. + /// + /// # Returns + /// You can use the second parameter to the function, the object `opt`, to choose how success/failure is represented in the returned value from the function call/returned promise (in the case of `func()` returning a promise (see above)) + /// + /// ## On successfully acquireing the lock: + /// * If `opt.wrap === true`: An object in the form of `{success: true, value: }`. If `opt.keep_func === "always"`, the object will include the field `"func": func` as well. (See below) + /// * Otherwise, just the returned value itself. (*default*) + /// + /// ## On failing to acquire the lock: + /// * If `opt.wrap === true`: An object in the form of `{success: false, value: opt.or, func: }` + /// * Otherwise, just `opt.or`. (*default*) + /// + /// ### Notes + /// * If `opt` has no field called `or`, `undefined` is used instead. + /// * If `opt` has a field called `keep_func` and that field is `=== true` or `=== "always"`, *and* `opt.wrap === true`: On a failure to acquire the lock, the wrapped object will be in the form `{ value: (opt.or || undefined), success: false, "func": func }`. + this.try_using = (func, opt) => { + opt = opt || { wrap: false, or: undefined, keep_func: false }; + + const wrap_success = (value, success) => { + if(opt.wrap === true) return { "value": value, + "success": success, + "func": (((!success && opt.keep_func === true) + || (opt.keep_func === "always")) + ? func + : undefined) }; + else return value; + }; + if(self.try_acquire_raw()) { + const try_eval = () => { + try { + return func(); + } catch(e) { self.release_raw(); throw(e); } + }; + const rv = try_eval(); // If an exception is thrown, the lock is released and then the exception is propagated from here. + if(is_promise(rv)) return (async () => { + // `rv` is a promise, wrap it's awaiting inside a new promise that release the lock regardless of if awaiting `rv` throws. + try { + return wrap_success(await rv, true); + } finally { self.release_raw(); } + })(); + else { + // `rv` is not a promise, we can release the lock and return the value + self.release_raw(); + return wrap_success(rv, true); + } + } else return wrap_success(opt.or, false); + }; + /// Returns a lambda that, when executed, runs `using(func)` and returns the awaitable promise. + this.bind_using = (func) => { return () => self.using(func); }; + + } -/// Create a Mutex -Semaphore.Mutex = () => new Semaphore(1); +/// Create a mutex (single cap semaphore) +Semaphore.mutex = () => new Semaphore(1); -const sem_example = async () => { - let semaphore = Semaphore.Mutex(); +const sem_example = async (locks) => { + let semaphore = new Semaphore(locks || 1); //Semaphore.Mutex(); - return await Promise.all([ - semaphore.using(async () => { - for(let i=0;i<10;i++) { - await new Promise(resolve => setTimeout(resolve, 100)); - console.log(`first: ${i}`); - } - return 10; - }), - semaphore.using(async () => { - for(let i=0;i<15;i++) { - await new Promise(resolve => setTimeout(resolve, 75)); - console.log(`second: ${i}`); + const gen_runner = (name, num, time) => { + if(!time || time < 0) return semaphore.bind_using(() => { + for(let i=0;i { + for(let i=0;i setTimeout(resolve, time)); + console.log(`${name}: ${i}`); } - return 15; - }), - semaphore.using(() => { - for(let i=0;i<500;i++) console.log(`third ${i}`); - return 500; - }), + return num; + }); + }; + + return await Promise.all([ + gen_runner("first", 10, 100)(), + gen_runner("second", 15, 75)(), + gen_runner("third", 500)(), + semaphore.try_using(() => { + for(let i=0;i<1000;i++) console.log(`!FOURTH: ${i}`); + return 1000; + }, {wrap: true, or: false, keep_func: true}), ]); }; + +//sem_example(); From d6f9ff78177752396b11ceb31437a23e8af855e1 Mon Sep 17 00:00:00 2001 From: Avril Date: Mon, 14 Feb 2022 18:36:46 +0000 Subject: [PATCH 7/7] AsyncTimeout: Now handles async functions for `thing` correctly. Improved `timeout_example(_timeout, cancel)` (+ `Cancellation` class for async cancellations). Added `try_example(time)` which runs the default example and cancels after `time`, returning the value or error in an object containing either `{ value: true }` or `{ error: }` when awaited. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added AsyncTimeout.await_timeout(time): Returns a promise that resolves when `time` has passed. No value is returned from this promise. Added AsyncTimeout.await_interval(time): Returns an async iterator that yields after `time` has passed. No value is yielded from this async iterator. (These helper functions are essentially the same as `new AsyncTimeout(()=>{}, time).[timeout/interval]()`.) Fortune for async-timeout-js's current commit: Middle blessing − 中吉 --- asynctimeout.js | 133 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 19 deletions(-) diff --git a/asynctimeout.js b/asynctimeout.js index dab865c..1024d29 100644 --- a/asynctimeout.js +++ b/asynctimeout.js @@ -1,3 +1,13 @@ + +const is_promise = obj => !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; + +const fire_and_forget = (a, on_error) => { + (async () => { + try { + await a(); + } catch(e) { if(on_error) on_error(e); else throw(e); } + })(); +}; /// An async wrapper around `setTimeout()` and `setInterval()`. /// /// # `setTimeout()` @@ -15,43 +25,128 @@ /// See `examples` below. function AsyncTimeout(thing, interval) { - function isPromise(obj) { - return !!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function'; - } - - async function _call(t) - { - if(isPromise(t)) return await t(arguments); - else await (async () => {}); - return t(arguments); - } - this.timeout = () => new Promise((resolve, reject) => { setTimeout(() => { - try { - _call(thing).then(resolve); //XXX: This design doesn't work, we'll need to go back to the drawing board with this function for this to work... The exception won't propagate here, so... - } catch(e) { reject(e); } + const eval_thing = () => { + try { + return thing(); + } catch(e) { reject(e); } + }; + const rv = eval_thing(); + if(is_promise(rv)) fire_and_forget(async () => resolve(await rv), reject); + else resolve(rv); }, interval); }); this.interval = async function*() { while(true) { yield await new Promise((resolve, reject) => { + const eval_thing = () => { + try { + return thing(); + } catch(e) { reject(e); } + }; setTimeout(() => { - try { _call(thing).then(resolve); } catch(e) { reject(e); } + const irv = eval_thing(); + if(is_promise(irv)) fire_and_forget(async() => resolve(await irv), reject); + else resolve(irv); }, interval); }); } }; } -const example = async (_timeout) => { +/// Return a promise that will resolve when the timeout `i` has completed. There is no value for this resolution. +AsyncTimeout.await_timeout = (i) => new AsyncTimeout(() => {}, i).timeout(); +/// Return an async iterator that will resolve the next iteration every `i`. There is no value for these yielded resolutions. +AsyncTimeout.await_interval = (i) => new AsyncTimeout(() => {}, i).interval(); + +/// Cancellation token source for `timeout_example` +function Cancellation() { + let tokens = []; + + this.cancel_after_interrupt = (sig) => new Promise(resolve => { + sig = sig || 'SIGINT'; + process.on(sig, () => { + //console.log(`>>> Received signal ${sig}`); + resolve(sig || ""); + }); + }); + + + this.cancel_after_invoke = () => new Promise(resolve => { + tokens.push(resolve); + }); + + this.cancel_invoke = (value) => tokens.shift()(value || ""); +} + +/// Run the timeout example +const timeout_example = async (_timeout, cancelAfter) => { _timeout = _timeout || 100; + let cancel = false; + + const canceller = new Promise(resolve => { + if(is_promise(cancelAfter)) (async () => { + resolve(cancel = ((await cancelAfter) || true)); + })(); + else if(cancelAfter && cancelAfter > 0) setTimeout(() => resolve(cancel = true), cancelAfter); + else resolve(false); + }); - // Wait 100 then alert "hi" + // Wait _timeout then alert "hi" console.log(await (new AsyncTimeout(() => "hi", _timeout)).timeout()); - // Continuously wait 100 then alert "hi" - for await (const value of new AsyncTimeout(() => "hi forever", _timeout).interval()) { console.log(value); } + // Wait _timeout then call then await this async lambda, then print its result. + console.log(await (new AsyncTimeout(async () => { + console.log("> Hi starting promise..."); + await AsyncTimeout.await_timeout(_timeout); + console.log(">> Hi inside promise!"); + await AsyncTimeout.await_timeout(_timeout); + console.log("> Hi ending promise..."); + return "hi, from async lambda!"; + }, _timeout)).timeout()); + + // Wait _timeout as with this async lambda which uses `.interval()` producing the result `(_timeout / 10) || 10` times. + console.log(await (new AsyncTimeout(async () => { + let i=0; + const len = (_timeout / 10) || 10; + for await (const value of AsyncTimeout.await_interval(_timeout)) { + console.log(`[al]: iteration ${i}`); + i+=1; + if(cancel) throw "Operation cancelled."; + else if(i == len) return `Hi from async lambda containing iterator!, Completed ${i} iterations`; + } + }, _timeout)).timeout()); + + // Wait _timeout as interval running at most `(_interval / 10) || 10` times + console.log(await (async () => { + let i=0; + const len = (_timeout / 10) || 10; + for await (const value of new AsyncTimeout(async () => { + console.log(`i > Waiting ${_timeout * 2}`); + await AsyncTimeout.await_timeout(_timeout * 2); + console.log(`i > Returning ${i} as iteration value.`); + return `${i}`; + }, _timeout).interval()) { + console.log(`!i >>> Iteration ${i}`); + i += 1; + if(cancel) throw "Operation cancelled."; + else if(i == len) return value; + } + })()); + + // Continuously wait _timeout then alert "hi forever", until `cancelAfter` is reached (if it is) + for await (const value of new AsyncTimeout(() => "hi forever", _timeout).interval()) { if(cancel) break; console.log(value); } + + // Return `true` if cancelled (or, if `cancelAfter` was a promise that returned a truthy value, that resolved value); `false` if not (should never happen) + return await canceller; }; +/// Run the timeout example, catching an error if there is one thrown (usually on cancellation.) +const try_example = async (time) => { + try { return { value: await timeout_example(null, time) }; } + catch(e) { console.log(`Error! ${e}`); return {error: e}; } +}; +//try_example(2500); // Error! Operation cancelled. -> `{ error: "Operation cancelled." }` +//try_example(5000); // -> `{ value: true }`