You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

319 lines
7.9 KiB

var http = require('http');
var process = require('child_process');
var WebSocketServer = require("ws").Server;
var events = require("events");
var jsoncompress = require('jsoncompress');
var net = require('net');
var fs = require('fs');
var config = require('./config');
var logger = require("./logger");
var _cycle = require("./cycle");
var CycleBuffer = require("./graphBuffer");
/// TODO: Buffer cycles. Clear them out every 24 hours
/// TODO: Add total aggregate (url /total) that can be served
function timeAverage(times)
{
if(times.length<1) return 0;
else if(times.length==1) return times[0];
else {
var cul=0;
for(var i=0;i<times.length;i++)
{
cul+=times[i];
}
return cul/times.length;
}
}
function diff(t1, t2)
{
if( t1>t2) return t1-t2;
else return t2-t1;
}
function bein(hay,nee)
{
return (nee in hay) || (("has_"+nee) in hay);
}
function createCandle(buffer)
{
var cdl = {
number: {
o:0,
c:0,
h:0,
l:0
}
};
if(buffer.length<1) return cdl;
if(buffer.length<2) {
cdl.number.c = cdl.number.o = cdl.number.h = cdl.number.l = buffer[0].number;
return cdl;
}
cdl.number.l = -1;
for(var i in buffer)
{
if(buffer[i].number>cdl.number.h)
cdl.number.h = buffer[i].number;
if(buffer[i].number<cdl.number.l||cdl.number.l==-1)
cdl.number.l = buffer[i].number;
}
cdl.number.o = buffer[0].number;
cdl.number.c = buffer[buffer.length-1].number;
return cdl;
}
var lastNumber = 0; /*we don't really need this, but it can fix some things*/
var lastCycle=new _cycle.Cycle();
var allTime = new _cycle.Cycle();
var allTimeAverage = new _cycle.Cycle();
var allTimeAverageN = 0;
var buffer = new CycleBuffer(config.BUFFER_PURGE_TIME);
var superbuffer = new CycleBuffer(config.SUPERBUFFER_PURGE_TIME);
var beInfo = null;
var upSince = new Date();
cycleEmitter = new events.EventEmitter();
buffer.onclear = function(bu) {
logger.log("Clearing and adding supercycle");
var ac = _cycle.Cycle.accumulate(bu.buffer);
ac._cycles = bu.buffer.length;
ac._candle = createCandle(bu.buffer);
superbuffer.add(ac);
cycleEmitter.emit("newSuperCycle", ac);
};
superbuffer.onclear = function(bu) {
logger.log("Clearing superbuffer");
};
logger.log("Buffer set to clear every "+(config.BUFFER_PURGE_TIME/1000/60)+" minutes");
logger.log("Superbuffer set to clear every "+(config.SUPERBUFFER_PURGE_TIME/1000/60)+" minutes");
process.execFile(config.PYTHON_EXECUTABLE, [config.RTBWCTL_FILENAME, config.BACKEND_SOCKET, "info"], function(error,stdout,stderr) {
try {
beInfo = JSON.parse(stdout);
} catch(e) {
logger.log("Recieved invalid JSON from info backend call.");
return;
}
logger.log("Backend info received");
});
var server = http.createServer(function(req, res) {
if(req.url == config.URL_BASE+"/cycle") {
logger.log("Recieved cycle request");
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify(lastCycle));
}
else if(req.url == config.URL_BASE+"/info") {
logger.log("Recieved info request");
if(!beInfo) {
res.writeHead(500);
res.end("");
}
else {
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify({
interval: config.POLL_INTERVAL,
purgeInterval: config.BUFFER_PURGE_TIME,
superPurgeInterval: config.SUPERBUFFER_PURGE_TIME,
backendInterval: beInfo.timeout,
board: beInfo.board,
upSince: upSince
}));
}
}
else if(req.url == config.URL_BASE+"/template")
{
logger.log("Recieved template request.");
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify(new _cycle.Cycle()));
}
else if(req.url == config.URL_BASE+"/all/average")
{
logger.log("Recieved all time average request.");
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
allTimeAverage._n = allTimeAverageN;
res.end(JSON.stringify(allTimeAverage));
}
else if(req.url == config.URL_BASE+"/all")
{
logger.log("Recieved all time stats request.");
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify(allTime));
}
else if(req.url == config.URL_BASE+"/buffer")
{
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify(buffer.all()));
}
else if(req.url == config.URL_BASE+"/buffer/super")
{
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
res.end(JSON.stringify(superbuffer.all()));
}
else if(req.url.startsWith(config.URL_BASE+"/buffer/"))
{
var lst = req.url.split("/").slice(-1)[0];
if(!isNaN(lst))
{
lst = Number(lst);
var bcpy = buffer.all();
res.setHeader("Content-Type", "application/json");
res.writeHead(200);
if(bcpy.length<=lst)
res.end(JSON.stringify(bcpy));
else
res.end(JSON.stringify(bcpy.slice(-lst)));
}
else {
res.writeHead(422);
res.end("Invalid request");
}
}
else {
res.writeHead(404);
res.end("404");
}
});
var wss = new WebSocketServer({
server: server,
autoAcceptConnections: false
});
wss.on('request', function(req) { //why isn't this getting called?
var url = require('url').parse(req.httpRequest.url);
if(url!=config.URL_BASE+"/cycle") return req.reject();
return req.accept();
});
wss.on('connection', function (ws) {
/*ws.on('message', function (message) {
ws.send("Received: " + message);
});*/
var f = function(cycle) {
ws.send(JSON.stringify({type: "cycle", data: cycle}));
};
var g = function(supercycle) {
ws.send(JSON.stringify({type: "supercycle", data: supercycle}));
};
logger.log("New connection");
ws.on("close", function(x,y) {
cycleEmitter.removeListener("newCycle", f);
cycleEmitter.removeListener("newSuperCycle", g);
logger.log("Connection closed");
});
cycleEmitter.addListener("newCycle", f);
cycleEmitter.addListener("newSuperCycle", g);
});
server.listen(config.PORT);
logger.log("Listening on port "+config.PORT);
var backend = setInterval(function() {
//logger.log("Polling backend");
process.execFile(config.PYTHON_EXECUTABLE, [config.RTBWCTL_FILENAME, config.BACKEND_SOCKET, "get-clear", "--data", lastNumber], function(error,stdout,stderr) {
var json = undefined;
try {
json = JSON.parse(stdout);
} catch(e) {
logger.log("Recieved invalid JSON from backend.");
return;
}
var cycle = new _cycle.Cycle();
var timeInters= [];
for (var i=0; i<json.length;i++)
{
if(json[i].no>lastNumber)
lastNumber = json[i].no;
cycle.number+=1;
if(json[i].country in cycle.countries)
cycle.countries[json[i].country]+=1;
else
cycle.countries[json[i].country]=1;
if(! ("thread" in json[i]))
cycle.threads+=1;
if(i<json.length-1)
{
timeInters.push(diff(Date.parse(json[i].time), Date.parse(json[i+1].time)));
}
if(bein(json[i], "com"))
cycle.comments+=1;
if(bein(json[i], "sub"))
cycle.subjects+=1;
if(bein(json[i], "id") && json[i]["id"] != "")
cycle.ids +=1;
if(bein(json[i], "filename"))
cycle.images+=1;
if(bein(json[i], "name") && bein(json[i], "trip"))
cycle.nametrips+=1;
else if(bein(json[i], "name"))
cycle.names+=1;
else if(bein(json[i], "trip"))
cycle.trips+=1;
}
cycle.interval = timeAverage(timeInters);
cycle.last = lastNumber;
lastCycle=cycle;
buffer.add(cycle);
allTime = _cycle.Cycle.accumulate([allTime, cycle]);
allTimeAverage = _cycle.Cycle.average(allTimeAverage, [cycle], allTimeAverageN);
allTimeAverageN += 1;
cycleEmitter.emit("newCycle", lastCycle);
});
}, config.POLL_INTERVAL);
/*var ctlServer = net.createServer();
if(!!config.CTL_SOCKET) {
ctlServer.listen(config.CTL_SOCKET, function() {
logger.log("Control server up on socket "+config.CTL_SOCKET);
});
ctlServer.on('connection', function(con) {
logger.log("Connection");
con.on('data', function (data) {
logger.log("CTL: "+data);
});
});
ctlServer.on('error', function(e) {
logger.log("Control server error ["+config.CTL_SOCKET+"]: "+e.code);
ctlServer.close();
});
}*/