function Stage(from) { var base = this; this.array = from || []; this.over = false; this.acceptInvalid = false; var next = { waiters: [], wait: function() { return new Promise(resolve => next.waiters.push(function() { resolve(); })); }, signal: function() { if(next.waiters.length>0) { next.waiters.shift()(); } }, flush: function() { while(next.waiters.length>0) { next.waiters.shift()(); } } }; this.take = async function() { if(base.over && base.array.length<=0) return undefined; if(base.array.length<=0) await next.wait(); if(base.over && base.array.length<=0) return undefined; return base.array.shift(); }; this.poll = function() { return base.array; }; this.takeNB = function() { if(base.array.length<=0) return null; return base.array.shift(); }; this.swallow0 = async function() { var ar = []; var token; while(token = await base.take()) { ar.push(token); } return ar; }; this.swallow = function(timeout) { if(!timeout) return base.swallow0(); else { return new Promise(function(_resolve,_reject) { var running=true; var resolve = function(v) { if(running) _resolve(v); running=false; }; var reject= function(v) { if(running) _reject(v); running=false; }; base.swallow0().then(resolve); if(timeout>0) setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); }); } }; this.giveMany= function(values) { for(let value of values) base.give(value); }; this.give = function(value) { if(!value && !base.acceptInvalid) return; base.array.push(value); next.signal(); }; this.commit = function() { base.over = true; next.flush(); return base; }; this.forceClose = function() { if(base.over) return base; else return base.commit(); }; }; Stage.commitAll = (stages) => { for(const st of stages) st.commit(); };