You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
3.7 KiB

import Stage from "@notflan/stage-js";
import Dispatcher from "./dispatcher";
import {addEventListenerHack} from "./util";
Dispatcher.socket = (sock,cloned) => socket_dispatcher(sock,cloned);
const socket_dispatcher = (ws) => {
const disp = new Dispatcher();
const pusher = new Stage();
let closed=false;
disp.socket = ws;
const h0 = addEventListenerHack(ws, 'message', (ev) => pusher.give(ev.data));
const h1 = addEventListenerHack(ws, 'close', () => pusher.commit());
(async () => {
let msg;
while ( (msg = await pusher.take()) !== undefined)
disp.signal({op: 'message', value: msg});
disp.signal({op: 'close'});
})();
disp.close = (consume) => {
if(!closed) {
ws.removeEventListener('message', h0);
ws.removeEventListener('close', h1);
pusher.commit();
if(consume)
ws.close();
closed=true;
}
};
disp.send = (msg) => {
if(!closed) {
ws.send(msg);
}
};
return disp;
};
const message_dispatcher = (ws) => {
return dispatcher_dispatcher(ws.socket);
};
const dispatcher_dispatcher = (ws) => {
const disp = new Dispatcher();
let closed=false;
disp.socket = ws.socket;
const hook = ws.hook((sig) => disp.signal(sig));//just pass it
disp.close = (consume) => {
if(!closed){
ws.unhook(hook);
if(consume && ws.close)
ws.close(true);
closed=true;
}
};
disp.send = (msg) => {
if(!closed) ws.socket.send(msg);
};
};
function MessageQueue(ws, filter, apply, map) {
this.socket = (ws instanceof Dispatcher) ? dispatcher_dispatcher(ws) : (ws instanceof MessageQueue) ? message_dispatcher(ws) : socket_dispatcher(ws);
this.filter = filter || (()=>true); // Filter incoming messages. Return true to keep, false to ignore.
this.apply = apply || ((o) => o); // Apply this function to outgoing messages.
this.map = map || ((o) => o); // Apply this function to incoming messages after `filter`
this.premap = ((o) => o); // Apply this function to incoming messages before `filter`
const stage = this.stage = new Stage();
const base = this;
this.hook = this.socket.hook((msg) => {
if (msg)
{
switch(msg.op)
{
case 'close':
base.close();
break;
case 'message':
msg = this.premap(msg.value);
if(this.filter(msg))
stage.give(this.map(msg));
break;
default:
//uhh
break;
}
}
});
}
MessageQueue.JSON = (...vars) => {
const q = new MessageQueue(...vars);
q.premap = x=> {
try {
return JSON.parse(x);
} catch(_) {
return null;
}
};
q.apply = x=> JSON.stringify(q.apply(x));
return q;
};
const MQ = MessageQueue.prototype;
MQ.clone = function(filter, apply, map) {
const mq = new MessageQueue(this.socket.socket, filter || this.filter, apply || this.apply, map || this.map);
mq.premap = this.premap;
mq.on_close = this.on_close;
return mq;
};
MQ.close = function(consume) {
if (this.on_close) {this.on_close(this, consume); this.on_close = null;}
this.socket.unhook(this.hook);
this.stage.commit();
this.socket.close(consume);
};
MQ.send = async function(value) {
while(true) {
const awaiter = this.stage.take();
this.socket.send(this.apply(value));
const ret = await awaiter;
if(ret === undefined) return ret;
else return ret;
}
};
MQ.send0 = function(value) {
this.socket.send(this.apply(value));
};
MQ.read = async function() {
while(true) {
const ret = await this.stage.take();
if(ret === undefined) return ret;
else return ret;
}
};
MQ.oneshot = async function(value) {
const ret = await this.send(value);
this.close();
return ret;
};
export default MessageQueue;