import Stage from "@notflan/stage-js"; import Dispatcher from "./dispatcher"; import {addEventListenerHack} from "./util"; Dispatcher.socket = (sock,cloned) => socket_dispatcher(sock,cloned); const socket_dispatcher = (ws) => { const disp = new Dispatcher(); const pusher = new Stage(); let closed=false; disp.socket = ws; const h0 = addEventListenerHack(ws, 'message', (ev) => pusher.give(ev.data)); const h1 = addEventListenerHack(ws, 'close', () => pusher.commit()); (async () => { let msg; while ( (msg = await pusher.take()) !== undefined) disp.signal({op: 'message', value: msg}); disp.signal({op: 'close'}); })(); disp.close = (consume) => { if(!closed) { ws.removeEventListener('message', h0); ws.removeEventListener('close', h1); pusher.commit(); if(consume) ws.close(); closed=true; } }; disp.send = (msg) => { if(!closed) { ws.send(msg); } }; return disp; }; const message_dispatcher = (ws) => { return dispatcher_dispatcher(ws.socket); }; const dispatcher_dispatcher = (ws) => { const disp = new Dispatcher(); let closed=false; disp.socket = ws.socket; const hook = ws.hook((sig) => disp.signal(sig));//just pass it disp.close = (consume) => { if(!closed){ ws.unhook(hook); if(consume && ws.close) ws.close(true); closed=true; } }; disp.send = (msg) => { if(!closed) ws.socket.send(msg); }; }; function MessageQueue(ws, filter, apply, map) { this.socket = (ws instanceof Dispatcher) ? dispatcher_dispatcher(ws) : (ws instanceof MessageQueue) ? message_dispatcher(ws) : socket_dispatcher(ws); this.filter = filter || (()=>true); // Filter incoming messages. Return true to keep, false to ignore. this.apply = apply || ((o) => o); // Apply this function to outgoing messages. this.map = map || ((o) => o); // Apply this function to incoming messages after `filter` this.premap = ((o) => o); // Apply this function to incoming messages before `filter` const stage = this.stage = new Stage(); const base = this; this.hook = this.socket.hook((msg) => { if (msg) { switch(msg.op) { case 'close': base.close(); break; case 'message': msg = this.premap(msg.value); if(this.filter(msg)) stage.give(this.map(msg)); break; default: //uhh break; } } }); } MessageQueue.JSON = (...vars) => { const q = new MessageQueue(...vars); q.premap = x=> { try { return JSON.parse(x); } catch(_) { return null; } }; q.apply = x=> JSON.stringify(q.apply(x)); return q; }; const MQ = MessageQueue.prototype; MQ.clone = function(filter, apply, map) { const mq = new MessageQueue(this.socket.socket, filter || this.filter, apply || this.apply, map || this.map); mq.premap = this.premap; mq.on_close = this.on_close; return mq; }; MQ.close = function(consume) { if (this.on_close) {this.on_close(this, consume); this.on_close = null;} this.socket.unhook(this.hook); this.stage.commit(); this.socket.close(consume); }; MQ.send = async function(value) { while(true) { const awaiter = this.stage.take(); this.socket.send(this.apply(value)); const ret = await awaiter; if(ret === undefined) return ret; else return ret; } }; MQ.send0 = function(value) { this.socket.send(this.apply(value)); }; MQ.read = async function() { while(true) { const ret = await this.stage.take(); if(ret === undefined) return ret; else return ret; } }; MQ.oneshot = async function(value) { const ret = await this.send(value); this.close(); return ret; }; export default MessageQueue;