/** * An async collection / item stream. * @param {Array} from Optional array to begin reading from. */ function Stage(from) { var base = this; this.array = from || []; this.over = false; this.acceptInvalid = false; var next = { waiters: [], wait: function() { return new Promise(resolve => next.waiters.push(function() { resolve(); })); }, signal: function() { if(next.waiters.length>0) { next.waiters.shift()(); } }, flush: function() { while(next.waiters.length>0) { next.waiters.shift()(); } } }; /** * Returns true if there are any values left in the stream, or if the stream is still open. */ this.hasValues = () => { return !(base.over && base.array.length==0); }; /** * Take one item off then end. Function will yield if none available to produce yet. * @return {} undefined if there is no more items to produce (`commit` was called), otherwise the next item in the stream. */ this.take = async function() { if(base.over && base.array.length<=0) return undefined; if(base.array.length<=0) await next.wait(); if(base.over && base.array.length<=0) return undefined; return base.array.shift(); }; /** Get all current items not yet produced. */ this.poll = function() { return base.array; }; /** * Take without blocking. Returns `null` if there are no items to take. */ this.takeNB = function() { if(base.array.length<=0) return null; return base.array.shift(); }; /** * Await and take the rest of the items in this stream until `commit` is called. */ this.swallow0 = async function() { var ar = []; var token; while((token = await base.take())) { ar.push(token); } return ar; }; /** * Returns a promise that resolves to `swallow0`. Optionally takes a timeout that will reject the promise if it does not complete before the stream is closed with `commit`. */ this.swallow = function(timeout) { if(!timeout) return base.swallow0(); else { return new Promise(function(_resolve,_reject) { var running=true; var resolve = function(v) { if(running) _resolve(v); running=false; }; var reject= function(v) { if(running) _reject(v); running=false; }; base.swallow0().then(resolve); if(timeout>0) setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); }); } }; /** * Send a collection of values to the stream. */ this.giveMany= function(values) { for(let value of values) base.give(value); }; /** * Send a single value to the stream */ this.give = function(value) { if(!value && !base.acceptInvalid) return; base.array.push(value); next.signal(); }; /** * Close the stream. After this is called, once all elements already in the stream have been consumed, `take` will resolve immediately to `undefined` for any more consumers. It cannot be opened again. */ this.commit = function() { base.over = true; next.flush(); return base; }; }; if(module) module.exports = Stage;