diff --git a/.#index.js b/.#index.js deleted file mode 120000 index 9f70976..0000000 --- a/.#index.js +++ /dev/null @@ -1 +0,0 @@ -avril@flan-laptop.2546:1595960333 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 50a0c39..51e06f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *~ node_modules/ +build/* diff --git a/build/message-min.js b/build/message-min.js deleted file mode 100644 index 311d6a3..0000000 --- a/build/message-min.js +++ /dev/null @@ -1,2 +0,0 @@ -var MessageQueue=function(){"use strict";var t=function(t,s){return t(s={exports:{}},s.exports),s.exports}((function(t){t&&(t.exports=function(t){var s=this;this.array=t||[],this.over=!1,this.acceptInvalid=!1;var e={waiters:[],wait:function(){return new Promise(t=>e.waiters.push((function(){t()})))},signal:function(){e.waiters.length>0&&e.waiters.shift()()},flush:function(){for(;e.waiters.length>0;)e.waiters.shift()()}};this.take=async function(){if(!(s.over&&s.array.length<=0||(s.array.length<=0&&await e.wait(),s.over&&s.array.length<=0)))return s.array.shift()},this.poll=function(){return s.array},this.takeNB=function(){return s.array.length<=0?null:s.array.shift()},this.swallow0=async function(){for(var t,e=[];t=await s.take();)e.push(t);return e},this.swallow=function(t){return t?new Promise((function(e,o){var n=!0;s.swallow0().then((function(t){n&&e(t),n=!1})),t>0&&setTimeout(()=>{return s=new Error("swallow timeout reached ("+t+")"),n&&o(s),void(n=!1);var s},t)})):s.swallow0()},this.giveMany=function(t){for(let e of t)s.give(e)},this.give=function(t){(t||s.acceptInvalid)&&(s.array.push(t),e.signal())},this.commit=function(){return s.over=!0,e.flush(),s}})}));function s(){this.hooks=[]}s.prototype.hook=function(t){return this.hooks.push(t),this.hooks.length-1},s.prototype.unhook=function(t){this.hooks[t]=null},s.prototype.signal=function(t){for(const s of this.hooks)s&&s.apply(this,[t])},s.prototype.clear=function(){this.hooks=[]};const e=(t,s,e)=>(t.addEventListener(s,e),e);s.socket=(t,s)=>o(t);const o=o=>{const n=new s,i=new t;let r=!1;n.socket=o;const a=e(o,"message",t=>i.give(t.data)),c=e(o,"close",()=>i.commit());return(async()=>{let t;for(;void 0!==(t=await i.take());)n.signal({op:"message",value:t});n.signal({op:"close"})})(),n.close=t=>{r||(o.removeEventListener("message",a),o.removeEventListener("close",c),i.commit(),t&&o.close(),r=!0)},n.send=t=>{r||o.send(t)},n},n=t=>{const e=new s;let o=!1;e.socket=t.socket;const n=t.hook(t=>e.signal(t));e.close=s=>{o||(t.unhook(n),s&&t.close&&t.close(!0),o=!0)},e.send=s=>{o||t.socket.send(s)}};function i(e,r,a,c){this.socket=e instanceof s?n(e):e instanceof i?(t=>n(t.socket))(e):o(e),this.filter=r||(()=>!0),this.apply=a||(t=>t),this.map=c||(t=>t),this.premap=t=>t;const h=this.stage=new t,l=this;this.hook=this.socket.hook(t=>{if(t)switch(t.op){case"close":l.close();break;case"message":t=this.premap(t.value),this.filter(t)&&h.give(this.map(t))}})}i.JSON=(...t)=>{const s=new i(...t);return s.premap=t=>{try{return JSON.parse(t)}catch(t){return null}},s.apply=t=>JSON.stringify(s.apply(t)),s};const r=i.prototype;return r.clone=function(t,s,e){const o=new i(this.socket.socket,t||this.filter,s||this.apply,e||this.map);return o.premap=this.premap,o.on_close=this.on_close,o},r.close=function(t){this.on_close&&(this.on_close(this,t),this.on_close=null),this.socket.unhook(this.hook),this.stage.commit(),this.socket.close(t)},r.send=async function(t){for(;;){const s=this.stage.take();this.socket.send(this.apply(t));const e=await s;return e}},r.send0=function(t){this.socket.send(this.apply(t))},r.read=async function(){for(;;){const t=await this.stage.take();return t}},r.oneshot=async function(t){const s=await this.send(t);return this.close(),s},i}(); -//# sourceMappingURL=message-min.js.map diff --git a/build/message-min.js.map b/build/message-min.js.map deleted file mode 100644 index c00b296..0000000 --- a/build/message-min.js.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"message-min.js","sources":["../node_modules/@notflan/stage-js/stage.js","../dispatcher.js","../util.js","../message.js"],"sourcesContent":["\n/**\n * An async collection / item stream.\n * @param {Array} from Optional array to begin reading from.\n */\nfunction Stage(from) {\n var base = this;\n this.array = from || [];\n this.over = false;\n this.acceptInvalid = false;\n var next = {\n\twaiters: [],\n\twait: function() {\n\t return new Promise(resolve => next.waiters.push(function() { resolve(); }));\n\t},\n\tsignal: function() {\n\t if(next.waiters.length>0) {\n\t\tnext.waiters.shift()();\n\t }\n\t}, \n\tflush: function() {\n\t while(next.waiters.length>0) {\n\t\tnext.waiters.shift()();\n\t }\n\t}\n };\n\n /**\n * Take one item off then end. Function will yield if none available to produce yet.\n * @return {} undefined if there is no more items to produce (`commit` was called), otherwise the next item in the stream.\n */\n this.take = async function() {\n\tif(base.over && base.array.length<=0)\n\t return undefined;\n\t\n\tif(base.array.length<=0)\n\t await next.wait();\n\n\n\tif(base.over && base.array.length<=0)\n\t return undefined;\n\t\n\treturn base.array.shift();\n };\n\n /**\n Get all current items not yet produced.\n */\n this.poll = function() {\n\treturn base.array;\n };\n\n /**\n * Take without blocking. Returns `null` if there are no items to take.\n */\n this.takeNB = function() {\n\tif(base.array.length<=0)\n\t return null;\n\treturn base.array.shift();\n };\n\n /**\n * Await and take the rest of the items in this stream until `commit` is called.\n */\n this.swallow0 = async function() {\n\tvar ar = [];\n\tvar token;\n\twhile((token = await base.take())) {\n\t ar.push(token);\n\t}\n\treturn ar;\n };\n\n /**\n * Returns a promise that resolves to `swallow0`. Optionally takes a timeout that will reject the promise if it does not complete before the stream is closed with `commit`.\n */\n this.swallow = function(timeout) {\n\tif(!timeout) return base.swallow0();\n\telse {\n\t return new Promise(function(_resolve,_reject) {\n\t\tvar running=true;\n\t\tvar resolve = function(v) {\n\t\t if(running) _resolve(v);\n\t\t running=false;\n\t\t};\n\t\tvar reject= function(v) {\n\t\t if(running) _reject(v);\n\t\t running=false;\n\t\t};\n\t\tbase.swallow0().then(resolve);\n\t\tif(timeout>0)\n\t\t setTimeout(()=> reject(new Error(\"swallow timeout reached (\"+timeout+\")\")), timeout);\n\t });\n\t}\n };\n\n /**\n * Send a collection of values to the stream.\n */\n this.giveMany= function(values) {\n\tfor(let value of values)\n\t base.give(value);\n };\n\n /**\n * Send a single value to the stream\n */\n this.give = function(value) {\n\tif(!value && !base.acceptInvalid)\n\t return;\n\tbase.array.push(value);\n\tnext.signal();\n };\n\n /**\n * Close the stream. After this is called, once all elements already in the stream have been consumed, `take` will resolve immediately to `undefined` for any more consumers. It cannot be opened again.\n */\n this.commit = function() {\n\tbase.over = true;\n\tnext.flush();\n\n\treturn base;\n };\n};\n\nif(module)\n module.exports = Stage;\n","\nfunction Dispatcher() {\n this.hooks = [];\n}\n\nDispatcher.prototype.hook = function(cb) {\n this.hooks.push(cb);\n return this.hooks.length-1;\n};\n\nDispatcher.prototype.unhook = function(index) {\n this.hooks[index] = null;\n};\n\nDispatcher.prototype.signal = function(value) {\n for(const cb of this.hooks) {\n\tif(cb) cb.apply(this, [value]);\n }\n};\n\nDispatcher.prototype.clear = function() {\n this.hooks = [];\n};\n\nexport default Dispatcher;\n","function uuidv4() {\n if (crypto) {\n\treturn ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16));\n }\n else return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {\n\tconst r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);\n\treturn v.toStirng(16);\n });\n}\n\nconst UUID_RE = /^(\\w{8}(-\\w{4}){3}-\\w{12}?)$/i;\nfunction is_uuid(id) {\n return (typeof id === 'string' || id instanceof String) && UUID_RE.test(id);\n}\n\nconst addEventListenerHack = (to, ev, lam) => {\n to.addEventListener(ev, lam);\n return lam;\n};\n\nexport {addEventListenerHack, uuidv4, is_uuid};\n","import Stage from \"@notflan/stage-js\";\nimport Dispatcher from \"./dispatcher\";\nimport {addEventListenerHack} from \"./util\";\n\nDispatcher.socket = (sock,cloned) => socket_dispatcher(sock,cloned);\n\nconst socket_dispatcher = (ws) => {\n const disp = new Dispatcher();\n const pusher = new Stage();\n let closed=false;\n \n disp.socket = ws;\n \n const h0 = addEventListenerHack(ws, 'message', (ev) => pusher.give(ev.data));\n const h1 = addEventListenerHack(ws, 'close', () => pusher.commit());\n\n (async () => {\n\tlet msg;\n\twhile ( (msg = await pusher.take()) !== undefined) \n\t disp.signal({op: 'message', value: msg});\n\tdisp.signal({op: 'close'});\n\n })();\n\n disp.close = (consume) => {\n\n\tif(!closed) {\n\t ws.removeEventListener('message', h0);\n\t ws.removeEventListener('close', h1);\n\t pusher.commit();\n\n\t if(consume)\n\t\tws.close();\n\t closed=true;\n\t}\n };\n\n disp.send = (msg) => {\n\tif(!closed) {\n\t ws.send(msg);\n\t}\n };\n \n return disp;\n};\n\nconst message_dispatcher = (ws) => {\n return dispatcher_dispatcher(ws.socket);\n};\n\nconst dispatcher_dispatcher = (ws) => {\n const disp = new Dispatcher();\n let closed=false;\n\n disp.socket = ws.socket;\n\n const hook = ws.hook((sig) => disp.signal(sig));//just pass it\n\n disp.close = (consume) => {\n\tif(!closed){\n\t ws.unhook(hook);\n\t if(consume && ws.close)\n\t\tws.close(true);\n\t closed=true;\n\t}\n };\n disp.send = (msg) => {\n\tif(!closed) ws.socket.send(msg);\n };\n};\n\nfunction MessageQueue(ws, filter, apply, map) {\n this.socket = (ws instanceof Dispatcher) ? dispatcher_dispatcher(ws) : (ws instanceof MessageQueue) ? message_dispatcher(ws) : socket_dispatcher(ws);\n \n this.filter = filter || (()=>true); // Filter incoming messages. Return true to keep, false to ignore.\n this.apply = apply || ((o) => o); // Apply this function to outgoing messages.\n this.map = map || ((o) => o); // Apply this function to incoming messages after `filter`\n this.premap = ((o) => o); // Apply this function to incoming messages before `filter`\n\n const stage = this.stage = new Stage();\n const base = this;\n\n this.hook = this.socket.hook((msg) => {\n\tif (msg)\n\t{\n\t switch(msg.op)\n\t {\n\t\tcase 'close':\n\t\tbase.close();\n\t\tbreak;\n\t\tcase 'message':\n\t\tmsg = this.premap(msg.value);\n\t\tif(this.filter(msg))\n\t\t stage.give(this.map(msg));\n\t\tbreak;\n\t\tdefault:\n\t\t//uhh\n\t\tbreak;\n\t }\n\t}\n });\n}\nMessageQueue.JSON = (...vars) => {\n const q = new MessageQueue(...vars);\n q.premap = x=> {\n\ttry {\n\t return JSON.parse(x);\n\t} catch(_) {\n\t return null;\n\t}\n };\n q.apply = x=> JSON.stringify(q.apply(x));\n \n return q;\n};\n\nconst MQ = MessageQueue.prototype;\n\nMQ.clone = function(filter, apply, map) {\n const mq = new MessageQueue(this.socket.socket, filter || this.filter, apply || this.apply, map || this.map);\n mq.premap = this.premap;\n mq.on_close = this.on_close;\n return mq;\n};\n\nMQ.close = function(consume) {\n if (this.on_close) {this.on_close(this, consume); this.on_close = null;}\n \n this.socket.unhook(this.hook);\n this.stage.commit();\n this.socket.close(consume);\n};\n\nMQ.send = async function(value) {\n while(true) {\n\tconst awaiter = this.stage.take();\n\tthis.socket.send(this.apply(value));\n\tconst ret = await awaiter;\n\n\tif(ret === undefined) return ret;\n\telse return ret;\n }\n};\n\nMQ.send0 = function(value) {\n this.socket.send(this.apply(value));\n};\n\nMQ.read = async function() {\n while(true) {\n\tconst ret = await this.stage.take();\n\n\tif(ret === undefined) return ret;\n\telse return ret;\n }\n};\n\nMQ.oneshot = async function(value) {\n const ret = await this.send(value);\n this.close();\n return ret;\n};\n\nexport default MessageQueue;\n"],"names":["module","from","base","this","array","over","acceptInvalid","next","waiters","wait","Promise","resolve","push","signal","length","shift","flush","take","async","poll","takeNB","swallow0","token","ar","swallow","timeout","_resolve","_reject","running","then","v","setTimeout","reject","Error","giveMany","values","value","give","commit","Dispatcher","hooks","prototype","hook","cb","unhook","index","apply","clear","addEventListenerHack","to","ev","lam","addEventListener","socket","sock","cloned","socket_dispatcher","ws","disp","pusher","Stage","closed","h0","data","h1","msg","undefined","op","close","consume","removeEventListener","send","dispatcher_dispatcher","sig","MessageQueue","filter","map","message_dispatcher","o","premap","stage","JSON","vars","q","x","parse","_","stringify","MQ","clone","mq","on_close","awaiter","ret","send0","read","oneshot"],"mappings":"wHA6HGA,IACCA,UAzHJ,SAAeC,GACX,IAAIC,EAAOC,KACXA,KAAKC,MAAQH,GAAQ,GACrBE,KAAKE,MAAO,EACZF,KAAKG,eAAgB,EACrB,IAAIC,EAAO,CACdC,QAAS,GACTC,KAAM,WACF,OAAO,IAAIC,QAAQC,GAAWJ,EAAKC,QAAQI,MAAK,WAAcD,SAElEE,OAAQ,WACDN,EAAKC,QAAQM,OAAO,GAC1BP,EAAKC,QAAQO,OAAbR,IAGDS,MAAO,WACH,KAAMT,EAAKC,QAAQM,OAAO,GAC7BP,EAAKC,QAAQO,OAAbR,KASEJ,KAAKc,KAAOC,iBACf,KAAGhB,EAAKG,MAAQH,EAAKE,MAAMU,QAAQ,IAGhCZ,EAAKE,MAAMU,QAAQ,SACZP,EAAKE,OAGZP,EAAKG,MAAQH,EAAKE,MAAMU,QAAQ,IAGnC,OAAOZ,EAAKE,MAAMW,SAMfZ,KAAKgB,KAAO,WACf,OAAOjB,EAAKE,OAMTD,KAAKiB,OAAS,WACjB,OAAGlB,EAAKE,MAAMU,QAAQ,EACX,KACJZ,EAAKE,MAAMW,SAMfZ,KAAKkB,SAAWH,iBAGnB,IAFA,IACII,EADAC,EAAK,GAEFD,QAAcpB,EAAKe,QACtBM,EAAGX,KAAKU,GAEZ,OAAOC,GAMJpB,KAAKqB,QAAU,SAASC,GAC3B,OAAIA,EAEO,IAAIf,SAAQ,SAASgB,EAASC,GACxC,IAAIC,GAAQ,EASZ1B,EAAKmB,WAAWQ,MARF,SAASC,GAChBF,GAASF,EAASI,GACrBF,GAAQ,KAOTH,EAAQ,GACPM,WAAW,KAAKC,OANEF,EAMK,IAAIG,MAAM,4BAA4BR,EAAQ,KALlEG,GAASD,EAAQG,QACpBF,GAAQ,GAFC,IAASE,GAM0DL,MAd7DvB,EAAKmB,YAsBtBlB,KAAK+B,SAAU,SAASC,GAC3B,IAAI,IAAIC,KAASD,EACbjC,EAAKmC,KAAKD,IAMXjC,KAAKkC,KAAO,SAASD,IACpBA,GAAUlC,EAAKI,iBAEnBJ,EAAKE,MAAMQ,KAAKwB,GAChB7B,EAAKM,WAMFV,KAAKmC,OAAS,WAIjB,OAHApC,EAAKG,MAAO,EACZE,EAAKS,QAEEd,QCxHR,SAASqC,IACLpC,KAAKqC,MAAQ,GAGjBD,EAAWE,UAAUC,KAAO,SAASC,GAEjC,OADAxC,KAAKqC,MAAM5B,KAAK+B,GACTxC,KAAKqC,MAAM1B,OAAO,GAG7ByB,EAAWE,UAAUG,OAAS,SAASC,GACnC1C,KAAKqC,MAAMK,GAAS,MAGxBN,EAAWE,UAAU5B,OAAS,SAASuB,GACnC,IAAI,MAAMO,KAAMxC,KAAKqC,MACrBG,GAAIA,EAAGG,MAAM3C,KAAM,CAACiC,KAIxBG,EAAWE,UAAUM,MAAQ,WACzB5C,KAAKqC,MAAQ,ICNjB,MAAMQ,EAAuB,CAACC,EAAIC,EAAIC,KAClCF,EAAGG,iBAAiBF,EAAIC,GACjBA,GCbXZ,EAAWc,OAAS,CAACC,EAAKC,IAAWC,EAAkBF,GAEvD,MAAME,EAAqBC,IACvB,MAAMC,EAAO,IAAInB,EACXoB,EAAS,IAAIC,EACnB,IAAIC,GAAO,EAEXH,EAAKL,OAASI,EAEd,MAAMK,EAAKd,EAAqBS,EAAI,UAAYP,GAAOS,EAAOtB,KAAKa,EAAGa,OAChEC,EAAKhB,EAAqBS,EAAI,QAAS,IAAME,EAAOrB,UA6B1D,MA3BA,WACH,IAAI2B,EACJ,UAAwCC,KAA/BD,QAAYN,EAAO1C,SACxByC,EAAK7C,OAAO,CAACsD,GAAI,UAAW/B,MAAO6B,IACvCP,EAAK7C,OAAO,CAACsD,GAAI,WAJd,GAQAT,EAAKU,MAASC,IAEbR,IACAJ,EAAGa,oBAAoB,UAAWR,GAClCL,EAAGa,oBAAoB,QAASN,GAChCL,EAAOrB,SAEJ+B,GACNZ,EAAGW,QACAP,GAAO,IAIRH,EAAKa,KAAQN,IACZJ,GACAJ,EAAGc,KAAKN,IAIFP,GAOLc,EAAyBf,IAC3B,MAAMC,EAAO,IAAInB,EACjB,IAAIsB,GAAO,EAEXH,EAAKL,OAASI,EAAGJ,OAEjB,MAAMX,EAAOe,EAAGf,KAAM+B,GAAQf,EAAK7C,OAAO4D,IAE1Cf,EAAKU,MAASC,IACbR,IACAJ,EAAGb,OAAOF,GACP2B,GAAWZ,EAAGW,OACpBX,EAAGW,OAAM,GACNP,GAAO,IAGRH,EAAKa,KAAQN,IACZJ,GAAQJ,EAAGJ,OAAOkB,KAAKN,KAI5B,SAASS,EAAajB,EAAIkB,EAAQ7B,EAAO8B,GACrCzE,KAAKkD,OAAUI,aAAclB,EAAciC,EAAsBf,GAAOA,aAAciB,EA1B/D,CAACjB,GACjBe,EAAsBf,EAAGJ,QAyBsEwB,CAAmBpB,GAAMD,EAAkBC,GAEjJtD,KAAKwE,OAASA,SAAe,GAC7BxE,KAAK2C,MAAQA,IAAWgC,GAAMA,GAC9B3E,KAAKyE,IAAMA,IAASE,GAAMA,GAC1B3E,KAAK4E,OAAWD,GAAMA,EAEtB,MAAME,EAAQ7E,KAAK6E,MAAQ,IAAIpB,EACzB1D,EAAOC,KAEbA,KAAKuC,KAAOvC,KAAKkD,OAAOX,KAAMuB,IACjC,GAAIA,EAEA,OAAOA,EAAIE,IAEd,IAAK,QACLjE,EAAKkE,QACL,MACA,IAAK,UACLH,EAAM9D,KAAK4E,OAAOd,EAAI7B,OACnBjC,KAAKwE,OAAOV,IACXe,EAAM3C,KAAKlC,KAAKyE,IAAIX,OAS1BS,EAAaO,KAAO,IAAIC,KACpB,MAAMC,EAAI,IAAIT,KAAgBQ,GAU9B,OATAC,EAAEJ,OAASK,IACd,IACI,OAAOH,KAAKI,MAAMD,GACpB,MAAME,GACJ,OAAO,OAGRH,EAAErC,MAAQsC,GAAIH,KAAKM,UAAUJ,EAAErC,MAAMsC,IAE9BD,GAGX,MAAMK,EAAKd,EAAajC,iBAExB+C,EAAGC,MAAQ,SAASd,EAAQ7B,EAAO8B,GAC/B,MAAMc,EAAK,IAAIhB,EAAavE,KAAKkD,OAAOA,OAAQsB,GAAUxE,KAAKwE,OAAQ7B,GAAS3C,KAAK2C,MAAO8B,GAAOzE,KAAKyE,KAGxG,OAFAc,EAAGX,OAAS5E,KAAK4E,OACjBW,EAAGC,SAAWxF,KAAKwF,SACZD,GAGXF,EAAGpB,MAAQ,SAASC,GACZlE,KAAKwF,WAAWxF,KAAKwF,SAASxF,KAAMkE,GAAUlE,KAAKwF,SAAW,MAElExF,KAAKkD,OAAOT,OAAOzC,KAAKuC,MACxBvC,KAAK6E,MAAM1C,SACXnC,KAAKkD,OAAOe,MAAMC,IAGtBmB,EAAGjB,KAAOrD,eAAekB,GACrB,OAAY,CACf,MAAMwD,EAAUzF,KAAK6E,MAAM/D,OAC3Bd,KAAKkD,OAAOkB,KAAKpE,KAAK2C,MAAMV,IAC5B,MAAMyD,QAAYD,EAElB,OAA6BC,IAK9BL,EAAGM,MAAQ,SAAS1D,GAChBjC,KAAKkD,OAAOkB,KAAKpE,KAAK2C,MAAMV,KAGhCoD,EAAGO,KAAO7E,iBACN,OAAY,CACf,MAAM2E,QAAY1F,KAAK6E,MAAM/D,OAE7B,OAA6B4E,IAK9BL,EAAGQ,QAAU9E,eAAekB,GACxB,MAAMyD,QAAY1F,KAAKoE,KAAKnC,GAE5B,OADAjC,KAAKiE,QACEyB"} \ No newline at end of file diff --git a/index.js b/index.js deleted file mode 100644 index e20090e..0000000 --- a/index.js +++ /dev/null @@ -1,73 +0,0 @@ -const MessageQueue = require("./message"); -const util = require("./util"); -const addEventListenerHack = util.addEventListenerHack; -const uuidv4 = util.uuidv4; -const is_uuid = util.is_uuid; - -/** - * - */ -function Pipe(ws) { - this.socket = ws; - - this.clients=0; - this.MAX_CLIENTS = 100; -} - -const P = Pipe.prototype; - -P.connect = async function(message) { - let msg; - const socket = new MessageQueue(this.socket); - try { - while ((msg = await socket.read()) !== undefined) { // Expects {com: 'open', id: uuidv4()} - try { - msg = JSON.parse(msg); - } catch(_) {continue;} - - if (msg?.com === 'open' && (!message || msg?.msg === message)) { - let id = validate_id(msg?.id); - if(id) { - if(this.clients < this.MAX_CLIENTS) { - const cli = new MessageQueue(this.socket); - let open = true; - cli.premap = (raw) => { - try { - return JSON.parse(raw); - }catch(_) {return null;} - }; - cli.filter = (json) => json?.id === id && (json?.com === 'msg' || ((json?.com === 'close') && cli.close() && false)); - cli.map = (json) => json?.message; - cli.apply = (raw) => { - const json = {message: raw, id: id, com: 'resp'}; - return JSON.stringify(json); - }; - cli.on_close = (self)=> { - if(open && self.id === id) { //in case of mistaken cloning - self.send0({com: 'resp', message: 'closing', id: id}); - this.clients -= 1; - open = false; - } - }; - cli.id= id; - - this.clients -=1; - socket.send0(JSON.stringify({com: 'resp', message: 'accepted', id: id})); - return cli; - } else { - socket.send0(JSON.stringify({com: 'resp', message: 'declined', id: id, reason: 'too many open connections'})); - } - } - } - } - } finally { - socket.close(); - } - return undefined; -}; - -P.close = function() { - this.socket.close(); -}; - -exports = Pipe; diff --git a/package.json b/package.json index 57c2a11..c936d42 100644 --- a/package.json +++ b/package.json @@ -1,15 +1,15 @@ { "name": "@username/message-queue", - "version": "0.0.0", + "version": "0.1.0", "description": "Await on websocket messages", - "entry": "index.js", + "entry": "message.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1", "dev": "rollup -c -w", "build": "rollup -c" }, "keywords": [ - "async" + "async", + "websocket" ], "author": "Avril", "license": "GPL-3.0-or-later", diff --git a/passthrough.js b/passthrough.js deleted file mode 100644 index 8e8132e..0000000 --- a/passthrough.js +++ /dev/null @@ -1,62 +0,0 @@ -import MessageQueue from "./message.js"; -import {addEventListenerHack, uuidv4, is_uuid} from "./util"; - - -function Pipe(ws) -{ - this.socket =ws; - this.id = uuidv4(); -} - -const P = Pipe.prototype; - -P.connect = async function(message) { - const socket = new MessageQueue(this.socket); - - socket.send0(JSON.stringify({com: 'open', id: this.id, msg: message})); - try { - let msg; - while ((msg = await socket.read()) !== undefined) { - try { - msg = JSON.parse(msg); - } catch(_){continue;} - - if (msg.id === id && msg.com === 'resp') { - if (msg.message === 'accepted') { - const cli = new MessageQueue(this.socket); - let open=true; - cli.id = id; - cli.apply = (raw) => { - return JSON.stringify({message: raw, com: 'msg', id: id}); - }; - cli.premap = (raw) => { - try { - return JSON.parse(raw); - }catch(_) {return null;} - }; - cli.filter = (json) => json?.id === 'id' && (json?.com === 'msg' || json?.com === 'resp' && json?.message === 'closing' && cli.close() && false); - cli.map = (json) => json?.message; - cli.on_close = (self) => { - if(open && self.id === id) { //in case of mistaken cloning - self.send0({com: 'close', message: 'closing', id: id}); - open = false; - } - }; - return cli; - } else { - console.log(`[${id}]: server responded with ${msg.message}: ${msg.reason}`); - return false; - } - } - } - } finally{ - socket.close(); - } - return undefined; -}; - -P.close = function() { - this.socket.close(); -}; - -export default Pipe; diff --git a/index.html b/test.html similarity index 100% rename from index.html rename to test.html diff --git a/util.js b/util.js index 7838c4f..0c5e5db 100644 --- a/util.js +++ b/util.js @@ -1,21 +1,6 @@ -function uuidv4() { - if (crypto) { - return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c => (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)); - } - else return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { - const r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8); - return v.toStirng(16); - }); -} - -const UUID_RE = /^(\w{8}(-\w{4}){3}-\w{12}?)$/i; -function is_uuid(id) { - return (typeof id === 'string' || id instanceof String) && UUID_RE.test(id); -} - const addEventListenerHack = (to, ev, lam) => { to.addEventListener(ev, lam); return lam; }; -export {addEventListenerHack, uuidv4, is_uuid}; +export {addEventListenerHack};