You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
3.7 KiB
165 lines
3.7 KiB
import Stage from "@notflan/stage-js";
|
|
import Dispatcher from "./dispatcher";
|
|
import {addEventListenerHack} from "./util";
|
|
|
|
Dispatcher.socket = (sock,cloned) => socket_dispatcher(sock,cloned);
|
|
|
|
const socket_dispatcher = (ws) => {
|
|
const disp = new Dispatcher();
|
|
const pusher = new Stage();
|
|
let closed=false;
|
|
|
|
disp.socket = ws;
|
|
|
|
const h0 = addEventListenerHack(ws, 'message', (ev) => pusher.give(ev.data));
|
|
const h1 = addEventListenerHack(ws, 'close', () => pusher.commit());
|
|
|
|
(async () => {
|
|
let msg;
|
|
while ( (msg = await pusher.take()) !== undefined)
|
|
disp.signal({op: 'message', value: msg});
|
|
disp.signal({op: 'close'});
|
|
|
|
})();
|
|
|
|
disp.close = (consume) => {
|
|
|
|
if(!closed) {
|
|
ws.removeEventListener('message', h0);
|
|
ws.removeEventListener('close', h1);
|
|
pusher.commit();
|
|
|
|
if(consume)
|
|
ws.close();
|
|
closed=true;
|
|
}
|
|
};
|
|
|
|
disp.send = (msg) => {
|
|
if(!closed) {
|
|
ws.send(msg);
|
|
}
|
|
};
|
|
|
|
return disp;
|
|
};
|
|
|
|
const message_dispatcher = (ws) => {
|
|
return dispatcher_dispatcher(ws.socket);
|
|
};
|
|
|
|
const dispatcher_dispatcher = (ws) => {
|
|
const disp = new Dispatcher();
|
|
let closed=false;
|
|
|
|
disp.socket = ws.socket;
|
|
|
|
const hook = ws.hook((sig) => disp.signal(sig));//just pass it
|
|
|
|
disp.close = (consume) => {
|
|
if(!closed){
|
|
ws.unhook(hook);
|
|
if(consume && ws.close)
|
|
ws.close(true);
|
|
closed=true;
|
|
}
|
|
};
|
|
disp.send = (msg) => {
|
|
if(!closed) ws.socket.send(msg);
|
|
};
|
|
};
|
|
|
|
function MessageQueue(ws, filter, apply, map) {
|
|
this.socket = (ws instanceof Dispatcher) ? dispatcher_dispatcher(ws) : (ws instanceof MessageQueue) ? message_dispatcher(ws) : socket_dispatcher(ws);
|
|
|
|
this.filter = filter || (()=>true); // Filter incoming messages. Return true to keep, false to ignore.
|
|
this.apply = apply || ((o) => o); // Apply this function to outgoing messages.
|
|
this.map = map || ((o) => o); // Apply this function to incoming messages after `filter`
|
|
this.premap = ((o) => o); // Apply this function to incoming messages before `filter`
|
|
|
|
const stage = this.stage = new Stage();
|
|
const base = this;
|
|
|
|
this.hook = this.socket.hook((msg) => {
|
|
if (msg)
|
|
{
|
|
switch(msg.op)
|
|
{
|
|
case 'close':
|
|
base.close();
|
|
break;
|
|
case 'message':
|
|
msg = this.premap(msg.value);
|
|
if(this.filter(msg))
|
|
stage.give(this.map(msg));
|
|
break;
|
|
default:
|
|
//uhh
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
MessageQueue.JSON = (...vars) => {
|
|
const q = new MessageQueue(...vars);
|
|
q.premap = x=> {
|
|
try {
|
|
return JSON.parse(x);
|
|
} catch(_) {
|
|
return null;
|
|
}
|
|
};
|
|
q.apply = x=> JSON.stringify(q.apply(x));
|
|
|
|
return q;
|
|
};
|
|
|
|
const MQ = MessageQueue.prototype;
|
|
|
|
MQ.clone = function(filter, apply, map) {
|
|
const mq = new MessageQueue(this.socket.socket, filter || this.filter, apply || this.apply, map || this.map);
|
|
mq.premap = this.premap;
|
|
mq.on_close = this.on_close;
|
|
return mq;
|
|
};
|
|
|
|
MQ.close = function(consume) {
|
|
if (this.on_close) {this.on_close(this, consume); this.on_close = null;}
|
|
|
|
this.socket.unhook(this.hook);
|
|
this.stage.commit();
|
|
this.socket.close(consume);
|
|
};
|
|
|
|
MQ.send = async function(value) {
|
|
while(true) {
|
|
const awaiter = this.stage.take();
|
|
this.socket.send(this.apply(value));
|
|
const ret = await awaiter;
|
|
|
|
if(ret === undefined) return ret;
|
|
else return ret;
|
|
}
|
|
};
|
|
|
|
MQ.send0 = function(value) {
|
|
this.socket.send(this.apply(value));
|
|
};
|
|
|
|
MQ.read = async function() {
|
|
while(true) {
|
|
const ret = await this.stage.take();
|
|
|
|
if(ret === undefined) return ret;
|
|
else return ret;
|
|
}
|
|
};
|
|
|
|
MQ.oneshot = async function(value) {
|
|
const ret = await this.send(value);
|
|
this.close();
|
|
return ret;
|
|
};
|
|
|
|
export default MessageQueue;
|