You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
319 lines
7.9 KiB
319 lines
7.9 KiB
var http = require('http');
|
|
var process = require('child_process');
|
|
var WebSocketServer = require("ws").Server;
|
|
var events = require("events");
|
|
var jsoncompress = require('jsoncompress');
|
|
var net = require('net');
|
|
var fs = require('fs');
|
|
|
|
var config = require('./config');
|
|
var logger = require("./logger");
|
|
var _cycle = require("./cycle");
|
|
var CycleBuffer = require("./graphBuffer");
|
|
|
|
/// TODO: Buffer cycles. Clear them out every 24 hours
|
|
/// TODO: Add total aggregate (url /total) that can be served
|
|
|
|
function timeAverage(times)
|
|
{
|
|
if(times.length<1) return 0;
|
|
else if(times.length==1) return times[0];
|
|
else {
|
|
var cul=0;
|
|
for(var i=0;i<times.length;i++)
|
|
{
|
|
cul+=times[i];
|
|
}
|
|
return cul/times.length;
|
|
}
|
|
}
|
|
|
|
function diff(t1, t2)
|
|
{
|
|
if( t1>t2) return t1-t2;
|
|
else return t2-t1;
|
|
}
|
|
|
|
function bein(hay,nee)
|
|
{
|
|
return (nee in hay) || (("has_"+nee) in hay);
|
|
}
|
|
|
|
function createCandle(buffer)
|
|
{
|
|
var cdl = {
|
|
number: {
|
|
o:0,
|
|
c:0,
|
|
h:0,
|
|
l:0
|
|
}
|
|
};
|
|
|
|
if(buffer.length<1) return cdl;
|
|
if(buffer.length<2) {
|
|
cdl.number.c = cdl.number.o = cdl.number.h = cdl.number.l = buffer[0].number;
|
|
return cdl;
|
|
}
|
|
cdl.number.l = -1;
|
|
for(var i in buffer)
|
|
{
|
|
if(buffer[i].number>cdl.number.h)
|
|
cdl.number.h = buffer[i].number;
|
|
if(buffer[i].number<cdl.number.l||cdl.number.l==-1)
|
|
cdl.number.l = buffer[i].number;
|
|
|
|
}
|
|
cdl.number.o = buffer[0].number;
|
|
cdl.number.c = buffer[buffer.length-1].number;
|
|
return cdl;
|
|
}
|
|
|
|
var lastNumber = 0; /*we don't really need this, but it can fix some things*/
|
|
var lastCycle=new _cycle.Cycle();
|
|
var allTime = new _cycle.Cycle();
|
|
var allTimeAverage = new _cycle.Cycle();
|
|
var allTimeAverageN = 0;
|
|
var buffer = new CycleBuffer(config.BUFFER_PURGE_TIME);
|
|
var superbuffer = new CycleBuffer(config.SUPERBUFFER_PURGE_TIME);
|
|
var beInfo = null;
|
|
|
|
var upSince = new Date();
|
|
|
|
cycleEmitter = new events.EventEmitter();
|
|
|
|
buffer.onclear = function(bu) {
|
|
logger.log("Clearing and adding supercycle");
|
|
var ac = _cycle.Cycle.accumulate(bu.buffer);
|
|
ac._cycles = bu.buffer.length;
|
|
ac._candle = createCandle(bu.buffer);
|
|
superbuffer.add(ac);
|
|
cycleEmitter.emit("newSuperCycle", ac);
|
|
};
|
|
|
|
superbuffer.onclear = function(bu) {
|
|
logger.log("Clearing superbuffer");
|
|
};
|
|
|
|
logger.log("Buffer set to clear every "+(config.BUFFER_PURGE_TIME/1000/60)+" minutes");
|
|
logger.log("Superbuffer set to clear every "+(config.SUPERBUFFER_PURGE_TIME/1000/60)+" minutes");
|
|
|
|
process.execFile(config.PYTHON_EXECUTABLE, [config.RTBWCTL_FILENAME, config.BACKEND_SOCKET, "info"], function(error,stdout,stderr) {
|
|
|
|
try {
|
|
beInfo = JSON.parse(stdout);
|
|
} catch(e) {
|
|
logger.log("Recieved invalid JSON from info backend call.");
|
|
return;
|
|
}
|
|
logger.log("Backend info received");
|
|
});
|
|
|
|
var server = http.createServer(function(req, res) {
|
|
if(req.url == config.URL_BASE+"/cycle") {
|
|
logger.log("Recieved cycle request");
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify(lastCycle));
|
|
}
|
|
else if(req.url == config.URL_BASE+"/info") {
|
|
logger.log("Recieved info request");
|
|
if(!beInfo) {
|
|
res.writeHead(500);
|
|
res.end("");
|
|
}
|
|
else {
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify({
|
|
interval: config.POLL_INTERVAL,
|
|
purgeInterval: config.BUFFER_PURGE_TIME,
|
|
superPurgeInterval: config.SUPERBUFFER_PURGE_TIME,
|
|
backendInterval: beInfo.timeout,
|
|
board: beInfo.board,
|
|
upSince: upSince
|
|
}));
|
|
}
|
|
}
|
|
else if(req.url == config.URL_BASE+"/template")
|
|
{
|
|
logger.log("Recieved template request.");
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify(new _cycle.Cycle()));
|
|
}
|
|
else if(req.url == config.URL_BASE+"/all/average")
|
|
{
|
|
logger.log("Recieved all time average request.");
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
allTimeAverage._n = allTimeAverageN;
|
|
res.end(JSON.stringify(allTimeAverage));
|
|
}
|
|
else if(req.url == config.URL_BASE+"/all")
|
|
{
|
|
logger.log("Recieved all time stats request.");
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify(allTime));
|
|
|
|
}
|
|
else if(req.url == config.URL_BASE+"/buffer")
|
|
{
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify(buffer.all()));
|
|
|
|
}
|
|
else if(req.url == config.URL_BASE+"/buffer/super")
|
|
{
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
res.end(JSON.stringify(superbuffer.all()));
|
|
}
|
|
else if(req.url.startsWith(config.URL_BASE+"/buffer/"))
|
|
{
|
|
var lst = req.url.split("/").slice(-1)[0];
|
|
if(!isNaN(lst))
|
|
{
|
|
lst = Number(lst);
|
|
var bcpy = buffer.all();
|
|
res.setHeader("Content-Type", "application/json");
|
|
res.writeHead(200);
|
|
if(bcpy.length<=lst)
|
|
res.end(JSON.stringify(bcpy));
|
|
else
|
|
res.end(JSON.stringify(bcpy.slice(-lst)));
|
|
|
|
}
|
|
else {
|
|
res.writeHead(422);
|
|
res.end("Invalid request");
|
|
}
|
|
}
|
|
else {
|
|
res.writeHead(404);
|
|
res.end("404");
|
|
}
|
|
});
|
|
|
|
|
|
|
|
var wss = new WebSocketServer({
|
|
server: server,
|
|
autoAcceptConnections: false
|
|
});
|
|
|
|
wss.on('request', function(req) { //why isn't this getting called?
|
|
var url = require('url').parse(req.httpRequest.url);
|
|
|
|
if(url!=config.URL_BASE+"/cycle") return req.reject();
|
|
|
|
return req.accept();
|
|
});
|
|
|
|
wss.on('connection', function (ws) {
|
|
|
|
/*ws.on('message', function (message) {
|
|
ws.send("Received: " + message);
|
|
});*/
|
|
var f = function(cycle) {
|
|
ws.send(JSON.stringify({type: "cycle", data: cycle}));
|
|
};
|
|
var g = function(supercycle) {
|
|
ws.send(JSON.stringify({type: "supercycle", data: supercycle}));
|
|
};
|
|
logger.log("New connection");
|
|
ws.on("close", function(x,y) {
|
|
cycleEmitter.removeListener("newCycle", f);
|
|
cycleEmitter.removeListener("newSuperCycle", g);
|
|
logger.log("Connection closed");
|
|
});
|
|
cycleEmitter.addListener("newCycle", f);
|
|
cycleEmitter.addListener("newSuperCycle", g);
|
|
});
|
|
|
|
server.listen(config.PORT);
|
|
|
|
logger.log("Listening on port "+config.PORT);
|
|
|
|
var backend = setInterval(function() {
|
|
//logger.log("Polling backend");
|
|
process.execFile(config.PYTHON_EXECUTABLE, [config.RTBWCTL_FILENAME, config.BACKEND_SOCKET, "get-clear", "--data", lastNumber], function(error,stdout,stderr) {
|
|
var json = undefined;
|
|
try {
|
|
json = JSON.parse(stdout);
|
|
} catch(e) {
|
|
logger.log("Recieved invalid JSON from backend.");
|
|
return;
|
|
}
|
|
var cycle = new _cycle.Cycle();
|
|
|
|
var timeInters= [];
|
|
|
|
for (var i=0; i<json.length;i++)
|
|
{
|
|
if(json[i].no>lastNumber)
|
|
lastNumber = json[i].no;
|
|
cycle.number+=1;
|
|
if(json[i].country in cycle.countries)
|
|
cycle.countries[json[i].country]+=1;
|
|
else
|
|
cycle.countries[json[i].country]=1;
|
|
if(! ("thread" in json[i]))
|
|
cycle.threads+=1;
|
|
|
|
if(i<json.length-1)
|
|
{
|
|
timeInters.push(diff(Date.parse(json[i].time), Date.parse(json[i+1].time)));
|
|
}
|
|
|
|
if(bein(json[i], "com"))
|
|
cycle.comments+=1;
|
|
if(bein(json[i], "sub"))
|
|
cycle.subjects+=1;
|
|
if(bein(json[i], "id") && json[i]["id"] != "")
|
|
cycle.ids +=1;
|
|
if(bein(json[i], "filename"))
|
|
cycle.images+=1;
|
|
if(bein(json[i], "name") && bein(json[i], "trip"))
|
|
cycle.nametrips+=1;
|
|
else if(bein(json[i], "name"))
|
|
cycle.names+=1;
|
|
else if(bein(json[i], "trip"))
|
|
cycle.trips+=1;
|
|
}
|
|
|
|
cycle.interval = timeAverage(timeInters);
|
|
cycle.last = lastNumber;
|
|
|
|
lastCycle=cycle;
|
|
buffer.add(cycle);
|
|
allTime = _cycle.Cycle.accumulate([allTime, cycle]);
|
|
allTimeAverage = _cycle.Cycle.average(allTimeAverage, [cycle], allTimeAverageN);
|
|
allTimeAverageN += 1;
|
|
cycleEmitter.emit("newCycle", lastCycle);
|
|
});
|
|
|
|
}, config.POLL_INTERVAL);
|
|
|
|
/*var ctlServer = net.createServer();
|
|
|
|
if(!!config.CTL_SOCKET) {
|
|
ctlServer.listen(config.CTL_SOCKET, function() {
|
|
logger.log("Control server up on socket "+config.CTL_SOCKET);
|
|
});
|
|
|
|
ctlServer.on('connection', function(con) {
|
|
logger.log("Connection");
|
|
con.on('data', function (data) {
|
|
logger.log("CTL: "+data);
|
|
});
|
|
});
|
|
|
|
ctlServer.on('error', function(e) {
|
|
logger.log("Control server error ["+config.CTL_SOCKET+"]: "+e.code);
|
|
ctlServer.close();
|
|
});
|
|
}*/
|