You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
doushio/server/okyaku.js

178 lines
4.1 KiB

var caps = require('./caps'),
common = require('../common'),
events = require('events'),
Muggle = require('../etc').Muggle,
STATE = require('./state'),
util = require('util'),
winston = require('winston');
var dispatcher = exports.dispatcher = {};
function Okyaku(socket, ip, country) {
events.EventEmitter.call(this);
this.socket = socket;
this.ident = caps.lookup_ident(ip, country);
this.watching = {};
this.ip = ip;
this.country = country;
var clients = STATE.clientsByIP[ip];
if (clients)
clients.push(this);
else
clients = STATE.clientsByIP[ip] = [this];
STATE.emitter.emit('change:clientsByIP', ip, clients);
}
util.inherits(Okyaku, events.EventEmitter);
exports.Okyaku = Okyaku;
var OK = Okyaku.prototype;
OK.send = function (msg) {
this.socket.write(JSON.stringify([msg]));
};
OK.on_update = function (op, kind, msg) {
// Special cases for operations that overwrite a client's state
if (this.post && kind == common.DELETE_POSTS) {
var nums = JSON.parse(msg)[0].slice(2);
if (nums.indexOf(this.post.num) >= 0)
this.post = null;
}
else if (this.post && kind == common.DELETE_THREAD) {
if (this.post.num == op || this.post.op == op)
this.post = null;
}
if (this.blackhole && HOLED_UPDATES.indexOf(kind) >= 0)
return;
this.socket.write(msg);
};
const HOLED_UPDATES = [common.DELETE_POSTS, common.DELETE_THREAD];
OK.on_thread_sink = function (thread, err) {
/* TODO */
winston.error(thread + ' sank: ' + err);
};
const WORMHOLES = [common.SYNCHRONIZE, common.FINISH_POST];
OK.on_message = function (data) {
var msg;
try { msg = JSON.parse(data); }
catch (e) {}
var type = common.INVALID;
if (msg) {
if (this.post && typeof msg == 'string')
type = common.UPDATE_POST;
else if (msg.constructor == Array)
type = msg.shift();
}
if (!this.synced && type != common.SYNCHRONIZE)
type = common.INVALID;
if (this.blackhole && WORMHOLES.indexOf(type) < 0)
return;
var func = dispatcher[type];
if (!func || !func(msg, this)) {
this.kotowaru(Muggle("Bad protocol.", new Error(
"Invalid message: " + JSON.stringify(data))));
}
};
var ip_expiries = {};
OK.on_close = function () {
var ip = this.ip;
var clientList = STATE.clientsByIP[ip];
if (clientList) {
var i = clientList.indexOf(this);
if (i >= 0) {
clientList.splice(i, 1);
STATE.emitter.emit('change:clientsByIP',ip,clientList);
}
if (!clientList.length) {
// Expire this list after a short delay
if (ip_expiries[ip])
clearTimeout(ip_expiries[ip]);
ip_expiries[ip] = setTimeout(function () {
var list = STATE.clientsByIP[ip];
if (list && list.length === 0)
delete STATE.clientsByIP[ip];
delete ip_expiries[ip];
}, 5000);
}
}
if (this.id) {
delete STATE.clients[this.id];
this.id = null;
}
this.synced = false;
var db = this.db;
if (db) {
db.kikanai();
if (this.post)
this.finish_post(function (err) {
if (err)
winston.warn('finishing post: ' + err);
db.disconnect();
});
else
db.disconnect();
}
this.emit('close');
};
OK.kotowaru = function (error) {
if (this.blackhole)
return;
var msg = 'Server error.';
if (error instanceof Muggle) {
msg = error.most_precise_error_message();
error = error.deepest_reason();
}
winston.error('Error by ' + JSON.stringify(this.ident) + ': '
+ (error || msg));
this.send([0, common.INVALID, msg]);
this.synced = false;
};
OK.finish_post = function (callback) {
/* TODO: Should we check this.uploading? */
var self = this;
this.db.finish_post(this.post, function (err) {
if (err)
callback(err);
else {
if (self.post) {
self.last_num = self.post.num;
self.post = null;
}
callback(null);
}
});
};
exports.scan_client_caps = function () {
for (var ip in STATE.clientsByIP) {
STATE.clientsByIP[ip].forEach(function (okyaku) {
if (!okyaku.id || !okyaku.board)
return;
var ident = caps.lookup_ident(ip, okyaku.country);
if (ident.timeout) {
okyaku.blackhole = true;
return;
}
if (!caps.can_access_board(ident, okyaku.board)) {
try {
okyaku.socket.close();
}
catch (e) { /* bleh */ }
}
});
}
};