master
Avril 4 years ago
parent 33ebd7f4e1
commit 5ec77cbda9
Signed by: flanchan
GPG Key ID: 284488987C31F630

@ -1 +0,0 @@
avril@flan-laptop.2546:1595960333

1
.gitignore vendored

@ -1,2 +1,3 @@
*~ *~
node_modules/ node_modules/
build/*

@ -1,2 +0,0 @@
var MessageQueue=function(){"use strict";var t=function(t,s){return t(s={exports:{}},s.exports),s.exports}((function(t){t&&(t.exports=function(t){var s=this;this.array=t||[],this.over=!1,this.acceptInvalid=!1;var e={waiters:[],wait:function(){return new Promise(t=>e.waiters.push((function(){t()})))},signal:function(){e.waiters.length>0&&e.waiters.shift()()},flush:function(){for(;e.waiters.length>0;)e.waiters.shift()()}};this.take=async function(){if(!(s.over&&s.array.length<=0||(s.array.length<=0&&await e.wait(),s.over&&s.array.length<=0)))return s.array.shift()},this.poll=function(){return s.array},this.takeNB=function(){return s.array.length<=0?null:s.array.shift()},this.swallow0=async function(){for(var t,e=[];t=await s.take();)e.push(t);return e},this.swallow=function(t){return t?new Promise((function(e,o){var n=!0;s.swallow0().then((function(t){n&&e(t),n=!1})),t>0&&setTimeout(()=>{return s=new Error("swallow timeout reached ("+t+")"),n&&o(s),void(n=!1);var s},t)})):s.swallow0()},this.giveMany=function(t){for(let e of t)s.give(e)},this.give=function(t){(t||s.acceptInvalid)&&(s.array.push(t),e.signal())},this.commit=function(){return s.over=!0,e.flush(),s}})}));function s(){this.hooks=[]}s.prototype.hook=function(t){return this.hooks.push(t),this.hooks.length-1},s.prototype.unhook=function(t){this.hooks[t]=null},s.prototype.signal=function(t){for(const s of this.hooks)s&&s.apply(this,[t])},s.prototype.clear=function(){this.hooks=[]};const e=(t,s,e)=>(t.addEventListener(s,e),e);s.socket=(t,s)=>o(t);const o=o=>{const n=new s,i=new t;let r=!1;n.socket=o;const a=e(o,"message",t=>i.give(t.data)),c=e(o,"close",()=>i.commit());return(async()=>{let t;for(;void 0!==(t=await i.take());)n.signal({op:"message",value:t});n.signal({op:"close"})})(),n.close=t=>{r||(o.removeEventListener("message",a),o.removeEventListener("close",c),i.commit(),t&&o.close(),r=!0)},n.send=t=>{r||o.send(t)},n},n=t=>{const e=new s;let o=!1;e.socket=t.socket;const n=t.hook(t=>e.signal(t));e.close=s=>{o||(t.unhook(n),s&&t.close&&t.close(!0),o=!0)},e.send=s=>{o||t.socket.send(s)}};function i(e,r,a,c){this.socket=e instanceof s?n(e):e instanceof i?(t=>n(t.socket))(e):o(e),this.filter=r||(()=>!0),this.apply=a||(t=>t),this.map=c||(t=>t),this.premap=t=>t;const h=this.stage=new t,l=this;this.hook=this.socket.hook(t=>{if(t)switch(t.op){case"close":l.close();break;case"message":t=this.premap(t.value),this.filter(t)&&h.give(this.map(t))}})}i.JSON=(...t)=>{const s=new i(...t);return s.premap=t=>{try{return JSON.parse(t)}catch(t){return null}},s.apply=t=>JSON.stringify(s.apply(t)),s};const r=i.prototype;return r.clone=function(t,s,e){const o=new i(this.socket.socket,t||this.filter,s||this.apply,e||this.map);return o.premap=this.premap,o.on_close=this.on_close,o},r.close=function(t){this.on_close&&(this.on_close(this,t),this.on_close=null),this.socket.unhook(this.hook),this.stage.commit(),this.socket.close(t)},r.send=async function(t){for(;;){const s=this.stage.take();this.socket.send(this.apply(t));const e=await s;return e}},r.send0=function(t){this.socket.send(this.apply(t))},r.read=async function(){for(;;){const t=await this.stage.take();return t}},r.oneshot=async function(t){const s=await this.send(t);return this.close(),s},i}();
//# sourceMappingURL=message-min.js.map

File diff suppressed because one or more lines are too long

@ -1,73 +0,0 @@
const MessageQueue = require("./message");
const util = require("./util");
const addEventListenerHack = util.addEventListenerHack;
const uuidv4 = util.uuidv4;
const is_uuid = util.is_uuid;
/**
*
*/
function Pipe(ws) {
this.socket = ws;
this.clients=0;
this.MAX_CLIENTS = 100;
}
const P = Pipe.prototype;
P.connect = async function(message) {
let msg;
const socket = new MessageQueue(this.socket);
try {
while ((msg = await socket.read()) !== undefined) { // Expects {com: 'open', id: uuidv4()}
try {
msg = JSON.parse(msg);
} catch(_) {continue;}
if (msg?.com === 'open' && (!message || msg?.msg === message)) {
let id = validate_id(msg?.id);
if(id) {
if(this.clients < this.MAX_CLIENTS) {
const cli = new MessageQueue(this.socket);
let open = true;
cli.premap = (raw) => {
try {
return JSON.parse(raw);
}catch(_) {return null;}
};
cli.filter = (json) => json?.id === id && (json?.com === 'msg' || ((json?.com === 'close') && cli.close() && false));
cli.map = (json) => json?.message;
cli.apply = (raw) => {
const json = {message: raw, id: id, com: 'resp'};
return JSON.stringify(json);
};
cli.on_close = (self)=> {
if(open && self.id === id) { //in case of mistaken cloning
self.send0({com: 'resp', message: 'closing', id: id});
this.clients -= 1;
open = false;
}
};
cli.id= id;
this.clients -=1;
socket.send0(JSON.stringify({com: 'resp', message: 'accepted', id: id}));
return cli;
} else {
socket.send0(JSON.stringify({com: 'resp', message: 'declined', id: id, reason: 'too many open connections'}));
}
}
}
}
} finally {
socket.close();
}
return undefined;
};
P.close = function() {
this.socket.close();
};
exports = Pipe;

@ -1,15 +1,15 @@
{ {
"name": "@username/message-queue", "name": "@username/message-queue",
"version": "0.0.0", "version": "0.1.0",
"description": "Await on websocket messages", "description": "Await on websocket messages",
"entry": "index.js", "entry": "message.js",
"scripts": { "scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "rollup -c -w", "dev": "rollup -c -w",
"build": "rollup -c" "build": "rollup -c"
}, },
"keywords": [ "keywords": [
"async" "async",
"websocket"
], ],
"author": "Avril", "author": "Avril",
"license": "GPL-3.0-or-later", "license": "GPL-3.0-or-later",

@ -1,62 +0,0 @@
import MessageQueue from "./message.js";
import {addEventListenerHack, uuidv4, is_uuid} from "./util";
function Pipe(ws)
{
this.socket =ws;
this.id = uuidv4();
}
const P = Pipe.prototype;
P.connect = async function(message) {
const socket = new MessageQueue(this.socket);
socket.send0(JSON.stringify({com: 'open', id: this.id, msg: message}));
try {
let msg;
while ((msg = await socket.read()) !== undefined) {
try {
msg = JSON.parse(msg);
} catch(_){continue;}
if (msg.id === id && msg.com === 'resp') {
if (msg.message === 'accepted') {
const cli = new MessageQueue(this.socket);
let open=true;
cli.id = id;
cli.apply = (raw) => {
return JSON.stringify({message: raw, com: 'msg', id: id});
};
cli.premap = (raw) => {
try {
return JSON.parse(raw);
}catch(_) {return null;}
};
cli.filter = (json) => json?.id === 'id' && (json?.com === 'msg' || json?.com === 'resp' && json?.message === 'closing' && cli.close() && false);
cli.map = (json) => json?.message;
cli.on_close = (self) => {
if(open && self.id === id) { //in case of mistaken cloning
self.send0({com: 'close', message: 'closing', id: id});
open = false;
}
};
return cli;
} else {
console.log(`[${id}]: server responded with ${msg.message}: ${msg.reason}`);
return false;
}
}
}
} finally{
socket.close();
}
return undefined;
};
P.close = function() {
this.socket.close();
};
export default Pipe;

@ -1,21 +1,6 @@
function uuidv4() {
if (crypto) {
return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16));
}
else return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
return v.toStirng(16);
});
}
const UUID_RE = /^(\w{8}(-\w{4}){3}-\w{12}?)$/i;
function is_uuid(id) {
return (typeof id === 'string' || id instanceof String) && UUID_RE.test(id);
}
const addEventListenerHack = (to, ev, lam) => { const addEventListenerHack = (to, ev, lam) => {
to.addEventListener(ev, lam); to.addEventListener(ev, lam);
return lam; return lam;
}; };
export {addEventListenerHack, uuidv4, is_uuid}; export {addEventListenerHack};

Loading…
Cancel
Save