commit 1371a4b75331366c13cf6881058538c59bc97fe8 Author: Avril Date: Thu Apr 2 21:59:49 2020 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c2658d7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules/ diff --git a/package.json b/package.json new file mode 100644 index 0000000..281537e --- /dev/null +++ b/package.json @@ -0,0 +1,11 @@ +{ + "name": "stage", + "version": "1.0.0", + "description": "Async collection", + "main": "stage.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Avril", + "license": "ISC" +} diff --git a/stage.js b/stage.js new file mode 100644 index 0000000..4ba4b9a --- /dev/null +++ b/stage.js @@ -0,0 +1,97 @@ +function Stage(from) { + var base = this; + this.array = from || []; + this.over = false; + this.acceptInvalid = false; + var next = { + waiters: [], + wait: function() { + return new Promise(resolve => next.waiters.push(function() { resolve(); })); + }, + signal: function() { + if(next.waiters.length>0) { + next.waiters.shift()(); + } + }, + flush: function() { + while(next.waiters.length>0) { + next.waiters.shift()(); + } + } + }; + + this.take = async function() { + if(base.over && base.array.length<=0) + return undefined; + + if(base.array.length<=0) + await next.wait(); + + + if(base.over && base.array.length<=0) + return undefined; + + return base.array.shift(); + + }; + + this.poll = function() { + return base.array; + }; + + this.takeNB = function() { + if(base.array.length<=0) + return null; + return base.array.shift(); + }; + + this.swallow0 = async function() { + var ar = []; + var token; + while(token = await base.take()) { + ar.push(token); + } + return ar; + }; + + this.swallow = function(timeout) { + if(!timeout) return base.swallow0(); + else { + return new Promise(function(_resolve,_reject) { + var running=true; + var resolve = function(v) { + if(running) _resolve(v); + running=false; + }; + var reject= function(v) { + if(running) _reject(v); + running=false; + }; + base.swallow0().then(resolve); + if(timeout>0) + setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); + }); + } + }; + + this.giveMany= function(values) { + for(let value of values) + base.give(value); + }; + + this.give = function(value) { + if(!value && !base.acceptInvalid) + return; + base.array.push(value); + next.signal(); + }; + + this.commit = function() { + base.over = true; + next.flush(); + + return base; + }; +}; + +module.exports = Stage;