const Stage = require('stage-js'); function Dispatcher() { this.hooks = []; } Dispatcher.prototype.hook = function(cb) { this.hooks.push(cb); return this.hooks.length-1; }; Dispatcher.prototype.unhook = function(index) { this.hooks[index] = null; }; Dispatcher.prototype.signal = function(value) { for(const cb of this.hooks) { if(cb) cb.apply(this, [value]); } }; Dispatcher.prototype.clear = function() { this.hooks = []; }; const addEventListenerHack = (to, ev, lam) => { to.addEventListener(ev, lam); return lam; }; const socket_dispatcher = (ws) => { const disp = new Dispatcher(); const pusher = new Stage(); let closed=false; disp.socket = ws; const h0 = ws.addEventListener('message', (ev) => pusher.give(ev.data)); const h1 = ws.addEventListener('close', () => pusher.commit()); (async () => { while ( (msg = await pusher.take()) !== undefined) disp.signal({op: 'message', value: msg}); disp.signal({op: 'close'}); })(); disp.close = () => { if(!closed) { ws.removeEventListener('message', h0); ws.removeEventListener('close', h1); pusher.commit(); closed=true; } }; disp.send = (msg) => { if(!closed) { ws.send(msg); } }; return disp; }; function MessageQueue(ws, filter, apply) { this.socket = socket_dispatcher(ws); this.filter = filter || (()=>true); this.apply = apply || ((o) => o); const stage = this.stage = new Stage(); const base = this; this.hook = this.socket.hook((msg) => { if (msg) { switch(msg.op) { case 'close': base.close(); break; case 'message': stage.give(msg.value); break; default: //uhh break; } } }); } const MQ = MessageQueue.prototype; MQ.close = () => { this.socket.unhook(this.hook); this.stage.commit(); this.socket.close(); }; MQ.send = async (value) => { const awaiter = this.stage.take(); this.socket.send(this.apply(value)); const ret = await awaiter; }; MQ.read = () => this.stage.take(); MQ.oneshot = async (value) => { const ret = await this.send(value); this.close(); return ret; }; if(module) module.exports = MessageQueue;