documented, published

master
Avril 4 years ago
parent 1371a4b753
commit 991993b8ef
Signed by: flanchan
GPG Key ID: 284488987C31F630

1
.gitignore vendored

@ -1 +1,2 @@
node_modules/ node_modules/
*~

@ -0,0 +1,6 @@
# stage-js
Async collection / stream for JavaScript.
# License
GPL'd with <3

@ -1,11 +1,11 @@
{ {
"name": "stage", "name": "@notflan/stage-js",
"version": "1.0.0", "version": "0.1.0",
"description": "Async collection", "description": "Async collection / stream",
"main": "stage.js", "main": "stage.js",
"scripts": { "scripts": {
"test": "echo \"Error: no test specified\" && exit 1" "test": "echo \"Error: no test specified\" && exit 1"
}, },
"author": "Avril", "author": "Avril",
"license": "ISC" "license": "GPL-3.0-or-later"
} }

@ -1,97 +1,127 @@
/**
* An async collection / item stream.
* @param {Array} from Optional array to begin reading from.
*/
function Stage(from) { function Stage(from) {
var base = this; var base = this;
this.array = from || []; this.array = from || [];
this.over = false; this.over = false;
this.acceptInvalid = false; this.acceptInvalid = false;
var next = { var next = {
waiters: [], waiters: [],
wait: function() { wait: function() {
return new Promise(resolve => next.waiters.push(function() { resolve(); })); return new Promise(resolve => next.waiters.push(function() { resolve(); }));
}, },
signal: function() { signal: function() {
if(next.waiters.length>0) { if(next.waiters.length>0) {
next.waiters.shift()(); next.waiters.shift()();
} }
}, },
flush: function() { flush: function() {
while(next.waiters.length>0) { while(next.waiters.length>0) {
next.waiters.shift()(); next.waiters.shift()();
} }
} }
}; };
this.take = async function() { /**
if(base.over && base.array.length<=0) * Take one item off then end. Function will yield if none available to produce yet.
return undefined; * @return {} undefined if there is no more items to produce (`commit` was called), otherwise the next item in the stream.
*/
if(base.array.length<=0) this.take = async function() {
await next.wait(); if(base.over && base.array.length<=0)
return undefined;
if(base.array.length<=0)
await next.wait();
if(base.over && base.array.length<=0) if(base.over && base.array.length<=0)
return undefined; return undefined;
return base.array.shift(); return base.array.shift();
};
};
this.poll = function() { /**
return base.array; Get all current items not yet produced.
}; */
this.poll = function() {
return base.array;
};
this.takeNB = function() { /**
if(base.array.length<=0) * Take without blocking. Returns `null` if there are no items to take.
return null; */
return base.array.shift(); this.takeNB = function() {
}; if(base.array.length<=0)
return null;
return base.array.shift();
};
this.swallow0 = async function() { /**
var ar = []; * Await and take the rest of the items in this stream until `commit` is called.
var token; */
while(token = await base.take()) { this.swallow0 = async function() {
ar.push(token); var ar = [];
} var token;
return ar; while((token = await base.take())) {
}; ar.push(token);
}
return ar;
};
this.swallow = function(timeout) { /**
if(!timeout) return base.swallow0(); * Returns a promise that resolves to `swallow0`. Optionally takes a timeout that will reject the promise if it does not complete before the stream is closed with `commit`.
else { */
return new Promise(function(_resolve,_reject) { this.swallow = function(timeout) {
var running=true; if(!timeout) return base.swallow0();
var resolve = function(v) { else {
if(running) _resolve(v); return new Promise(function(_resolve,_reject) {
running=false; var running=true;
}; var resolve = function(v) {
var reject= function(v) { if(running) _resolve(v);
if(running) _reject(v); running=false;
running=false; };
}; var reject= function(v) {
base.swallow0().then(resolve); if(running) _reject(v);
if(timeout>0) running=false;
setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout); };
}); base.swallow0().then(resolve);
} if(timeout>0)
}; setTimeout(()=> reject(new Error("swallow timeout reached ("+timeout+")")), timeout);
});
}
};
this.giveMany= function(values) { /**
for(let value of values) * Send a collection of values to the stream.
base.give(value); */
}; this.giveMany= function(values) {
for(let value of values)
base.give(value);
};
this.give = function(value) { /**
if(!value && !base.acceptInvalid) * Send a single value to the stream
return; */
base.array.push(value); this.give = function(value) {
next.signal(); if(!value && !base.acceptInvalid)
}; return;
base.array.push(value);
next.signal();
};
this.commit = function() { /**
base.over = true; * Close the stream. After this is called, once all elements already in the stream have been consumed, `take` will resolve immediately to `undefined` for any more consumers. It cannot be opened again.
next.flush(); */
this.commit = function() {
base.over = true;
next.flush();
return base; return base;
}; };
}; };
module.exports = Stage; if(module)
module.exports = Stage;

Loading…
Cancel
Save