diff --git a/.gitignore b/.gitignore index c2658d7..a33a71f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ node_modules/ +*~ diff --git a/README.md b/README.md new file mode 100644 index 0000000..641de21 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# stage-js + +Async collection / stream for JavaScript. + +# License +GPL'd with <3 diff --git a/package.json b/package.json index 281537e..8b710f6 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,11 @@ { - "name": "stage", - "version": "1.0.0", - "description": "Async collection", - "main": "stage.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "Avril", - "license": "ISC" + "name": "@notflan/stage-js", + "version": "0.1.0", + "description": "Async collection / stream", + "main": "stage.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Avril", + "license": "GPL-3.0-or-later" } diff --git a/stage.js b/stage.js index 4ba4b9a..2df2ebe 100644 --- a/stage.js +++ b/stage.js @@ -1,97 +1,127 @@ + +/** + * An async collection / item stream. + * @param {Array} from Optional array to begin reading from. + */ function Stage(from) { - var base = this; - this.array = from || []; - this.over = false; - this.acceptInvalid = false; - var next = { + var base = this; + this.array = from || []; + this.over = false; + this.acceptInvalid = false; + var next = { waiters: [], wait: function() { - return new Promise(resolve => next.waiters.push(function() { resolve(); })); - }, - signal: function() { - if(next.waiters.length>0) { - next.waiters.shift()(); - } - }, - flush: function() { - while(next.waiters.length>0) { - next.waiters.shift()(); - } - } - }; + return new Promise(resolve => next.waiters.push(function() { resolve(); })); + }, + signal: function() { + if(next.waiters.length>0) { + next.waiters.shift()(); + } + }, + flush: function() { + while(next.waiters.length>0) { + next.waiters.shift()(); + } + } + }; - this.take = async function() { - if(base.over && base.array.length<=0) - return undefined; - - if(base.array.length<=0) - await next.wait(); + /** + * Take one item off then end. Function will yield if none available to produce yet. + * @return {} undefined if there is no more items to produce (`commit` was called), otherwise the next item in the stream. + */ + this.take = async function() { + if(base.over && base.array.length<=0) + return undefined; + + if(base.array.length<=0) + await next.wait(); - if(base.over && base.array.length<=0) - return undefined; + if(base.over && base.array.length<=0) + return undefined; - return base.array.shift(); - - }; + return base.array.shift(); + }; - this.poll = function() { - return base.array; - }; + /** + Get all current items not yet produced. + */ + this.poll = function() { + return base.array; + }; - this.takeNB = function() { - if(base.array.length<=0) - return null; - return base.array.shift(); - }; + /** + * Take without blocking. Returns `null` if there are no items to take. + */ + this.takeNB = function() { + if(base.array.length<=0) + return null; + return base.array.shift(); + }; - this.swallow0 = async function() { - var ar = []; - var token; - while(token = await base.take()) { - ar.push(token); - } - return ar; - }; + /** + * Await and take the rest of the items in this stream until `commit` is called. + */ + this.swallow0 = async function() { + var ar = []; + var token; + while((token = await base.take())) { + ar.push(token); + } + return ar; + }; - this.swallow = function(timeout) { - if(!timeout) return base.swallow0(); - else { - return new Promise(function(_resolve,_reject) { - var running=true; - var resolve = function(v) { - if(running) _resolve(v); - running=false; - }; - var reject= function(v) { - if(running) _reject(v); - running=false; - }; - base.swallow0().then(resolve); - if(timeout>0) - setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); - }); - } - }; + /** + * Returns a promise that resolves to `swallow0`. Optionally takes a timeout that will reject the promise if it does not complete before the stream is closed with `commit`. + */ + this.swallow = function(timeout) { + if(!timeout) return base.swallow0(); + else { + return new Promise(function(_resolve,_reject) { + var running=true; + var resolve = function(v) { + if(running) _resolve(v); + running=false; + }; + var reject= function(v) { + if(running) _reject(v); + running=false; + }; + base.swallow0().then(resolve); + if(timeout>0) + setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); + }); + } + }; - this.giveMany= function(values) { - for(let value of values) - base.give(value); - }; + /** + * Send a collection of values to the stream. + */ + this.giveMany= function(values) { + for(let value of values) + base.give(value); + }; - this.give = function(value) { - if(!value && !base.acceptInvalid) - return; - base.array.push(value); - next.signal(); - }; + /** + * Send a single value to the stream + */ + this.give = function(value) { + if(!value && !base.acceptInvalid) + return; + base.array.push(value); + next.signal(); + }; - this.commit = function() { - base.over = true; - next.flush(); + /** + * Close the stream. After this is called, once all elements already in the stream have been consumed, `take` will resolve immediately to `undefined` for any more consumers. It cannot be opened again. + */ + this.commit = function() { + base.over = true; + next.flush(); - return base; - }; + return base; + }; }; -module.exports = Stage; +if(module) + module.exports = Stage;