You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1948 lines
46 KiB
1948 lines
46 KiB
var _ = require('./lib/underscore'),
|
|
async = require('async'),
|
|
cache = require('./server/state').dbCache,
|
|
caps = require('./server/caps'),
|
|
common = require('./common'),
|
|
config = require('./config'),
|
|
events = require('events'),
|
|
fs = require('fs'),
|
|
hooks = require('./hooks'),
|
|
ipUtils = require('ip'),
|
|
Muggle = require('./etc').Muggle,
|
|
tail = require('./tail'),
|
|
util = require('util'),
|
|
winston = require('winston');
|
|
|
|
var imager = require('./imager'); /* set up hooks */
|
|
|
|
var OPs = exports.OPs = cache.OPs;
|
|
var TAGS = exports.TAGS = cache.opTags;
|
|
var SUBS = exports.SUBS = cache.threadSubs;
|
|
|
|
var LUA = {};
|
|
function register_lua(name) {
|
|
var src = fs.readFileSync('lua/' + name + '.lua', 'UTF-8');
|
|
LUA[name] = {src: src};
|
|
}
|
|
|
|
function redis_client() {
|
|
var conn = require('redis').createClient(config.REDIS_PORT || undefined);
|
|
// ASYNC SETUP RACE!
|
|
for (var k in LUA)
|
|
load(LUA[k]);
|
|
|
|
function load(entry) {
|
|
conn.script('load', entry.src, function (err, sha) {
|
|
if (err)
|
|
throw err;
|
|
entry.sha = sha;
|
|
});
|
|
}
|
|
return conn;
|
|
}
|
|
exports.redis_client = redis_client;
|
|
|
|
// wait for the `register_lua` calls before connecting
|
|
process.nextTick(function () {
|
|
global.redis = redis_client();
|
|
});
|
|
|
|
/* REAL-TIME UPDATES */
|
|
|
|
function Subscription(targetInfo) {
|
|
events.EventEmitter.call(this);
|
|
this.setMaxListeners(0);
|
|
|
|
this.fullKey = targetInfo.key;
|
|
this.target = targetInfo.target;
|
|
this.channel = targetInfo.channel;
|
|
SUBS[this.fullKey] = this;
|
|
|
|
this.pending_subscriptions = [];
|
|
this.subscription_callbacks = [];
|
|
|
|
this.k = redis_client();
|
|
this.k.on('error', this.on_sub_error.bind(this));
|
|
this.k.on('subscribe', this.on_one_sub.bind(this));
|
|
this.k.subscribe(this.target);
|
|
this.subscriptions = [this.target];
|
|
this.pending_subscriptions.push(this.target);
|
|
if (this.target != this.fullKey) {
|
|
this.k.subscribe(this.fullKey);
|
|
this.pending_subscriptions.push(this.fullKey);
|
|
}
|
|
};
|
|
|
|
util.inherits(Subscription, events.EventEmitter);
|
|
var S = Subscription.prototype;
|
|
|
|
Subscription.full_key = function (target, ident) {
|
|
var channel;
|
|
if (ident && ident.priv)
|
|
channel = 'priv:' + ident.priv;
|
|
else if (caps.can_moderate(ident))
|
|
channel = 'auth';
|
|
var key = channel ? channel + ':' + target : target;
|
|
return {key: key, channel: channel, target: target};
|
|
};
|
|
|
|
Subscription.get = function (target, ident) {
|
|
var full = Subscription.full_key(target, ident);
|
|
var sub = SUBS[full.key];
|
|
if (!sub)
|
|
sub = new Subscription(full);
|
|
return sub;
|
|
};
|
|
|
|
S.when_ready = function (cb) {
|
|
if (this.subscription_callbacks)
|
|
this.subscription_callbacks.push(cb);
|
|
else
|
|
cb(null);
|
|
};
|
|
|
|
S.on_one_sub = function (name) {
|
|
var i = this.pending_subscriptions.indexOf(name);
|
|
if (i < 0)
|
|
throw "Obtained unasked-for subscription " + name + "?!";
|
|
this.pending_subscriptions.splice(i, 1);
|
|
if (this.pending_subscriptions.length == 0)
|
|
this.on_all_subs();
|
|
};
|
|
|
|
S.on_all_subs = function () {
|
|
var k = this.k;
|
|
k.removeAllListeners('subscribe');
|
|
k.on('message', this.on_message.bind(this));
|
|
k.removeAllListeners('error');
|
|
k.on('error', this.sink_sub.bind(this));
|
|
this.subscription_callbacks.forEach(function (cb) {
|
|
cb(null);
|
|
});
|
|
delete this.pending_subscriptions;
|
|
delete this.subscription_callbacks;
|
|
};
|
|
|
|
function parse_pub_message(msg) {
|
|
var m = msg.match(/^(\d+)\|/);
|
|
var prefixLen = m[0].length;
|
|
var bodyLen = parse_number(m[1]);
|
|
var info = {body: msg.substr(prefixLen, bodyLen)};
|
|
var suffixPos = prefixLen + bodyLen;
|
|
if (msg.length > suffixPos)
|
|
info.suffixPos = suffixPos;
|
|
return info;
|
|
}
|
|
|
|
S.on_message = function (chan, msg) {
|
|
/* Do we need to clarify whether this came from target or fullKey? */
|
|
var parsed = parse_pub_message(msg), extra;
|
|
if (this.channel && parsed.suffixPos) {
|
|
var suffix = JSON.parse(msg.slice(parsed.suffixPos));
|
|
extra = suffix[this.channel];
|
|
}
|
|
msg = parsed.body;
|
|
var m = msg.match(/^(\d+),(\d+)/);
|
|
if(m===null) return;
|
|
var op = parse_number(m[1]);
|
|
var kind = parse_number(m[2]);
|
|
|
|
if (extra && kind == common.INSERT_POST) {
|
|
// add ip to INSERT_POST
|
|
var m = msg.match(/^(\d+,2,\d+,{)(.+)$/);
|
|
if (m && extra.ip) {
|
|
if (/"ip":/.test(msg))
|
|
throw "`ip` in public pub " + chan;
|
|
msg = m[1] + '"ip":' + JSON.stringify(extra.ip) + ',' + m[2];
|
|
}
|
|
}
|
|
|
|
this.emit('update', op, kind, '[[' + msg + ']]');
|
|
};
|
|
|
|
S.on_sub_error = function (err) {
|
|
winston.error("Subscription error:", (err.stack || err));
|
|
this.commit_sudoku();
|
|
this.subscription_callbacks.forEach(function (cb) {
|
|
cb(err);
|
|
});
|
|
this.subscription_callbacks = null;
|
|
};
|
|
|
|
S.sink_sub = function (err) {
|
|
if (config.DEBUG)
|
|
throw err;
|
|
this.emit('error', this.target, err);
|
|
this.commit_sudoku();
|
|
};
|
|
|
|
S.commit_sudoku = function () {
|
|
var k = this.k;
|
|
k.removeAllListeners('error');
|
|
k.removeAllListeners('message');
|
|
k.removeAllListeners('subscribe');
|
|
k.quit();
|
|
if (SUBS[this.fullKey] === this)
|
|
delete SUBS[this.fullKey];
|
|
this.removeAllListeners('update');
|
|
this.removeAllListeners('error');
|
|
};
|
|
|
|
S.has_no_listeners = function () {
|
|
/* Possibly idle out after a while */
|
|
var self = this;
|
|
if (this.idleOutTimer)
|
|
clearTimeout(this.idleOutTimer);
|
|
this.idleOutTimer = setTimeout(function () {
|
|
self.idleOutTimer = null;
|
|
if (self.listeners('update').length == 0)
|
|
self.commit_sudoku();
|
|
}, 30 * 1000);
|
|
};
|
|
|
|
/* OP CACHE */
|
|
|
|
function add_OP_tag(tagIndex, op) {
|
|
var tags = TAGS[op];
|
|
if (tags === undefined)
|
|
TAGS[op] = tagIndex;
|
|
else if (typeof tags == 'number') {
|
|
if (tagIndex != tags)
|
|
TAGS[op] = [tags, tagIndex];
|
|
}
|
|
else if (tags.indexOf(tagIndex) < 0)
|
|
tags.push(tagIndex);
|
|
}
|
|
|
|
function set_OP_tag(tagIndex, op) {
|
|
TAGS[op] = tagIndex;
|
|
}
|
|
|
|
function OP_has_tag(tag, op) {
|
|
var index = config.BOARDS.indexOf(tag);
|
|
if (index < 0)
|
|
return false;
|
|
var tags = TAGS[op];
|
|
if (tags === undefined)
|
|
return false;
|
|
if (typeof tags == 'number')
|
|
return index == tags;
|
|
else
|
|
return tags.indexOf(index) >= 0;
|
|
};
|
|
exports.OP_has_tag = OP_has_tag;
|
|
|
|
exports.first_tag_of = function (op) {
|
|
var tags = TAGS[op];
|
|
if (tags === undefined)
|
|
return false;
|
|
else if (typeof tags == 'number')
|
|
return config.BOARDS[tags];
|
|
else
|
|
return config.BOARDS[tags[0]];
|
|
};
|
|
|
|
function tags_of(op) {
|
|
var tags = TAGS[op];
|
|
if (tags === undefined)
|
|
return false;
|
|
else if (typeof tags == 'number')
|
|
return [config.BOARDS[tags]];
|
|
else
|
|
return tags.map(function (i) { return config.BOARDS[i]; });
|
|
}
|
|
exports.tags_of = tags_of;
|
|
|
|
function update_cache(chan, msg) {
|
|
msg = JSON.parse(msg);
|
|
var op = msg.op, kind = msg.kind, tag = msg.tag;
|
|
|
|
if (kind == common.INSERT_POST) {
|
|
if (msg.num)
|
|
OPs[msg.num] = op;
|
|
else {
|
|
add_OP_tag(config.BOARDS.indexOf(tag), op);
|
|
OPs[op] = op;
|
|
}
|
|
}
|
|
else if (kind == common.MOVE_THREAD) {
|
|
set_OP_tag(config.BOARDS.indexOf(tag), op);
|
|
}
|
|
else if (kind == common.DELETE_POSTS) {
|
|
msg.nums.forEach(function (num) {
|
|
delete OPs[num];
|
|
});
|
|
}
|
|
else if (kind == common.DELETE_THREAD) {
|
|
msg.nums.forEach(function (num) {
|
|
delete OPs[num];
|
|
});
|
|
delete TAGS[op];
|
|
}
|
|
}
|
|
|
|
exports.track_OPs = function (callback) {
|
|
var k = redis_client();
|
|
k.subscribe('cache');
|
|
k.once('subscribe', function () {
|
|
load_OPs(callback);
|
|
});
|
|
k.on('message', update_cache);
|
|
/* k persists for the purpose of cache updates */
|
|
};
|
|
|
|
exports.on_pub = function (name, handler) {
|
|
// TODO: share redis connection
|
|
var k = redis_client();
|
|
k.subscribe(name);
|
|
k.on('message', handler);
|
|
/* k persists */
|
|
};
|
|
|
|
function load_OPs(callback) {
|
|
var r = global.redis;
|
|
var boards = config.BOARDS;
|
|
// Want consistent ordering in the TAGS entries for multi-tag threads
|
|
// (so do them in series)
|
|
tail.forEach(boards, scan_board, callback);
|
|
|
|
var threadsKey;
|
|
function scan_board(tag, cb) {
|
|
var tagIndex = boards.indexOf(tag);
|
|
threadsKey = 'tag:' + tag_key(tag) + ':threads';
|
|
r.zrange(threadsKey, 0, -1, function (err, threads) {
|
|
if (err)
|
|
return cb(err);
|
|
async.forEach(threads, function (op, cb) {
|
|
op = parse_number(op);
|
|
var ps = [scan_thread.bind(null,tagIndex,op)];
|
|
if (!config.READ_ONLY && config.THREAD_EXPIRY
|
|
&& tag != 'archive') {
|
|
ps.push(refresh_expiry.bind(null,
|
|
tag, op));
|
|
}
|
|
async.parallel(ps, cb);
|
|
}, cb);
|
|
});
|
|
}
|
|
|
|
function scan_thread(tagIndex, op, cb) {
|
|
op = parse_number(op);
|
|
add_OP_tag(tagIndex, op);
|
|
OPs[op] = op;
|
|
get_all_replies_and_privs(r, op, function (err, posts) {
|
|
if (err)
|
|
return cb(err);
|
|
posts.forEach(function (num) {
|
|
OPs[parse_number(num)] = op;
|
|
});
|
|
cb(null);
|
|
});
|
|
}
|
|
|
|
var expiryKey = expiry_queue_key();
|
|
function refresh_expiry(tag, op, cb) {
|
|
if (tag == config.STAFF_BOARD)
|
|
return cb(null);
|
|
var entry = op + ':' + tag_key(tag);
|
|
var queries = ['time', 'immortal'];
|
|
hmget_obj(r, 'thread:'+op, queries, function (err, thread) {
|
|
if (err)
|
|
return cb(err);
|
|
if (!thread.time) {
|
|
winston.warn('Thread '+op+" doesn't exist.");
|
|
var m = r.multi();
|
|
m.zrem(threadsKey, op);
|
|
m.zrem(expiryKey, entry);
|
|
m.exec(cb);
|
|
return;
|
|
}
|
|
if (thread.immortal)
|
|
return r.zrem(expiryKey, entry, cb);
|
|
var score = expiry_queue_score(thread.time);
|
|
r.zadd(expiryKey, score, entry, cb);
|
|
});
|
|
}
|
|
}
|
|
|
|
function expiry_queue_score(time) {
|
|
return Math.floor(parse_number(time)/1000 + config.THREAD_EXPIRY);
|
|
}
|
|
|
|
function expiry_queue_key() {
|
|
return 'expiry:' + config.THREAD_EXPIRY;
|
|
}
|
|
exports.expiry_queue_key = expiry_queue_key;
|
|
|
|
/* SOCIETY */
|
|
|
|
exports.is_board = function (board) {
|
|
return config.BOARDS.indexOf(board) >= 0;
|
|
};
|
|
|
|
exports.UPKEEP_IDENT = {auth: 'Upkeep', ip: '127.0.0.1'};
|
|
|
|
function Yakusoku(board, ident) {
|
|
events.EventEmitter.call(this);
|
|
this.id = ++(cache.YAKUMAN);
|
|
this.tag = board;
|
|
this.ident = ident;
|
|
this.subs = [];
|
|
}
|
|
|
|
util.inherits(Yakusoku, events.EventEmitter);
|
|
exports.Yakusoku = Yakusoku;
|
|
var Y = Yakusoku.prototype;
|
|
|
|
Y.connect = function () {
|
|
// multiple redis connections are pointless (without slaves)
|
|
return global.redis;
|
|
};
|
|
|
|
Y.disconnect = function () {
|
|
this.removeAllListeners('end');
|
|
};
|
|
|
|
function forEachInObject(obj, f, callback) {
|
|
var total = 0, complete = 0, done = false, errors = [];
|
|
function cb(err) {
|
|
complete++;
|
|
if (err)
|
|
errors.push(err);
|
|
if (done && complete == total)
|
|
callback(errors.length ? errors : null);
|
|
}
|
|
for (var k in obj) {
|
|
if (obj.hasOwnProperty(k)) {
|
|
total++;
|
|
f(k, cb);
|
|
}
|
|
}
|
|
done = true;
|
|
if (complete == total)
|
|
callback(errors.length ? errors : null);
|
|
}
|
|
|
|
Y.target_key = function (id) {
|
|
return (id == 'live') ? 'tag:' + this.tag : 'thread:' + id;
|
|
};
|
|
|
|
Y.kiku = function (targets, on_update, on_sink, callback) {
|
|
var self = this;
|
|
this.on_update = on_update;
|
|
this.on_sink = on_sink;
|
|
forEachInObject(targets, function (id, cb) {
|
|
var target = self.target_key(id);
|
|
var sub = Subscription.get(target, self.ident);
|
|
sub.on('update', on_update);
|
|
sub.on('error', on_sink);
|
|
self.subs.push(sub.fullKey);
|
|
sub.when_ready(cb);
|
|
}, callback);
|
|
};
|
|
|
|
Y.kikanai = function () {
|
|
var self = this;
|
|
this.subs.forEach(function (key) {
|
|
var sub = SUBS[key];
|
|
if (sub) {
|
|
sub.removeListener('update', self.on_update);
|
|
sub.removeListener('error', self.on_sink);
|
|
if (sub.listeners('update').length == 0)
|
|
sub.has_no_listeners();
|
|
}
|
|
});
|
|
this.subs = [];
|
|
};
|
|
|
|
function post_volume(view, body) {
|
|
return (body ? body.length : 0) +
|
|
(view ? (config.NEW_POST_WORTH || 0) : 0) +
|
|
((view && view.image) ? (config.IMAGE_WORTH || 0) : 0);
|
|
}
|
|
|
|
function update_throughput(m, ip, when, quant) {
|
|
var key = 'ip:' + ip + ':throttle:';
|
|
var shortKey = key + short_term_timeslot(when);
|
|
var longKey = key + long_term_timeslot(when);
|
|
m.incrby(shortKey, quant);
|
|
m.incrby(longKey, quant);
|
|
/* Don't want to use expireat in case of timezone trickery
|
|
or something dumb. (Really, UTC should be OK though...) */
|
|
// Conservative expirations
|
|
m.expire(shortKey, 10 * 60);
|
|
m.expire(longKey, 2 * 24 * 3600);
|
|
}
|
|
|
|
function short_term_timeslot(when) {
|
|
return Math.floor(when / (1000 * 60 * 5));
|
|
}
|
|
|
|
function long_term_timeslot(when) {
|
|
return Math.floor(when / (1000 * 60 * 60 * 24));
|
|
}
|
|
|
|
Y.reserve_post = function (op, ip, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Can't post right now."));
|
|
var r = this.connect();
|
|
if (ipUtils.isLoopback(ip))
|
|
return reserve();
|
|
|
|
var key = 'ip:' + ip + ':throttle:';
|
|
var now = Date.now();
|
|
var shortTerm = key + short_term_timeslot(now);
|
|
var longTerm = key + long_term_timeslot(now);
|
|
r.mget([shortTerm, longTerm], function (err, quants) {
|
|
if (err)
|
|
return callback(Muggle("Limiter failure.", err));
|
|
if (quants[0] > config.SHORT_TERM_LIMIT ||
|
|
quants[1] > config.LONG_TERM_LIMIT)
|
|
return callback(Muggle('Reduce your speed.'));
|
|
|
|
reserve();
|
|
});
|
|
|
|
function reserve() {
|
|
r.incr('postctr', function (err, num) {
|
|
if (err)
|
|
return callback(err);
|
|
OPs[num] = op || num;
|
|
callback(null, num);
|
|
});
|
|
}
|
|
};
|
|
|
|
var optPostFields = 'name trip email auth subject flavor'.split(' ');
|
|
|
|
Y.insert_post = function (msg, body, extra, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Can't post right now."));
|
|
var r = this.connect();
|
|
if (!this.tag)
|
|
return callback(Muggle("Can't retrieve board for posting."));
|
|
var ip = extra.ip, board = extra.board, num = msg.num, op = msg.op;
|
|
if (!num)
|
|
return callback(Muggle("No post num."));
|
|
else if (!ip)
|
|
return callback(Muggle("No IP."));
|
|
else if (op) {
|
|
if (OPs[op] != op || !OP_has_tag(board, op)) {
|
|
delete OPs[num];
|
|
return callback(Muggle('Thread does not exist.'));
|
|
}
|
|
}
|
|
|
|
var view = {time: msg.time, ip: ip, state: msg.state.join()};
|
|
optPostFields.forEach(function (field) {
|
|
if (msg[field])
|
|
view[field] = msg[field];
|
|
});
|
|
var tagKey = 'tag:' + tag_key(this.tag);
|
|
if (op)
|
|
view.op = op;
|
|
else {
|
|
view.tags = tag_key(board);
|
|
if (board == config.STAFF_BOARD)
|
|
view.immortal = 1;
|
|
}
|
|
|
|
if (extra.image_alloc) {
|
|
msg.image = extra.image_alloc.image;
|
|
if (!op == msg.image.pinky)
|
|
return callback(Muggle("Image is the wrong size."));
|
|
delete msg.image.pinky;
|
|
}
|
|
|
|
var key = (op ? 'post:' : 'thread:') + num;
|
|
var bump = !op || !common.is_sage(view.email);
|
|
var m = r.multi();
|
|
m.incr(tagKey + ':postctr'); // must be first
|
|
if (op)
|
|
m.hget('thread:' + op, 'subject'); // must be second
|
|
if (bump)
|
|
m.incr(tagKey + ':bumpctr');
|
|
m.sadd('liveposts', key);
|
|
|
|
hooks.trigger_sync('inlinePost', {src: msg, dest: view});
|
|
if (msg.image) {
|
|
if (op)
|
|
m.hincrby('thread:' + op, 'imgctr', 1);
|
|
else
|
|
view.imgctr = 1;
|
|
imager.note_hash(msg.image.hash, msg.num);
|
|
view.dims = view.dims.toString();
|
|
}
|
|
m.hmset(key, view);
|
|
m.set(key + ':body', body);
|
|
if (msg.links)
|
|
m.hmset(key + ':links', msg.links);
|
|
|
|
var etc = {cacheUpdate: {}};
|
|
var priv = this.ident.priv;
|
|
if (op) {
|
|
etc.ipNum = num;
|
|
etc.cacheUpdate.num = num;
|
|
var pre = 'thread:' + op;
|
|
if (priv) {
|
|
m.sadd(pre + ':privs', priv);
|
|
m.rpush(pre + ':privs:' + priv, num);
|
|
}
|
|
else
|
|
m.rpush(pre + ':posts', num);
|
|
}
|
|
else {
|
|
// TODO: Add to alternate thread list?
|
|
// set conditional hide?
|
|
op = num;
|
|
if (!view.immortal) {
|
|
var score = expiry_queue_score(msg.time);
|
|
var entry = num + ':' + tag_key(this.tag);
|
|
m.zadd(expiry_queue_key(), score, entry);
|
|
}
|
|
/* Rate-limit new threads */
|
|
if (ipUtils.isLoopback(ip))
|
|
m.setex('ip:'+ip+':throttle:thread',
|
|
config.THREAD_THROTTLE, op);
|
|
}
|
|
|
|
/* Denormalize for backlog */
|
|
view.nonce = msg.nonce;
|
|
view.body = body;
|
|
if (msg.links)
|
|
view.links = msg.links;
|
|
extract(view);
|
|
delete view.ip;
|
|
|
|
var self = this;
|
|
async.waterfall([
|
|
function (next) {
|
|
if (!msg.image)
|
|
return next(null);
|
|
|
|
imager.commit_image_alloc(extra.image_alloc, next);
|
|
},
|
|
function (next) {
|
|
if (ip) {
|
|
var n = post_volume(view, body);
|
|
if (n > 0)
|
|
update_throughput(m, ip, view.time, n);
|
|
etc.ip = ip;
|
|
}
|
|
|
|
self._log(m, op, common.INSERT_POST, [num, view], etc);
|
|
|
|
m.exec(next);
|
|
},
|
|
function (results, next) {
|
|
if (!bump)
|
|
return next();
|
|
var postctr = results[0];
|
|
var subject = subject_val(op,
|
|
op==num ? view.subject : results[1]);
|
|
var m = r.multi();
|
|
m.zadd(tagKey + ':threads', postctr, op);
|
|
if (subject)
|
|
m.zadd(tagKey + ':subjects', postctr, subject);
|
|
m.exec(next);
|
|
}],
|
|
function (err) {
|
|
if (err) {
|
|
delete OPs[num];
|
|
return callback(err);
|
|
}
|
|
callback(null);
|
|
});
|
|
};
|
|
|
|
Y.remove_post = function (from_thread, num, callback) {
|
|
num = parse_number(num);
|
|
var op = OPs[num];
|
|
if (!op)
|
|
return callback(Muggle('No such post.'));
|
|
if (op == num) {
|
|
if (!from_thread)
|
|
return callback('Deletion loop?!');
|
|
return this.remove_thread(num, callback);
|
|
}
|
|
|
|
var r = this.connect();
|
|
var self = this;
|
|
if (from_thread) {
|
|
var key = 'thread:' + op;
|
|
r.lrem(key + ':posts', -1, num, function (err, delCount) {
|
|
if (err)
|
|
return callback(err);
|
|
/* did someone else already delete this? */
|
|
if (delCount != 1)
|
|
return callback(null, -num);
|
|
/* record deletion */
|
|
r.rpush(key + ':dels', num, function (err) {
|
|
if (err)
|
|
winston.warn(err);
|
|
gone_from_thread();
|
|
});
|
|
});
|
|
}
|
|
else
|
|
gone_from_thread();
|
|
|
|
function gone_from_thread() {
|
|
var key = 'post:' + num;
|
|
r.hset(key, 'hide', '1', function (err) {
|
|
if (err) {
|
|
/* Difficult to recover. Whatever. */
|
|
winston.warn("Couldn't hide: " + err);
|
|
}
|
|
/* TODO push cache update? */
|
|
delete OPs[num];
|
|
|
|
callback(null, [op, num]);
|
|
|
|
/* In the background, try to finish the post */
|
|
self.finish_quietly(key, warn);
|
|
self.hide_image(key, warn);
|
|
});
|
|
}
|
|
};
|
|
|
|
Y.remove_posts = function (nums, callback) {
|
|
var self = this;
|
|
tail.map(nums, this.remove_post.bind(this, true), all_gone);
|
|
|
|
function all_gone(err, dels) {
|
|
if (err)
|
|
return callback(err);
|
|
var threads = {}, already_gone = [];
|
|
dels.forEach(function (del) {
|
|
if (Array.isArray(del)) {
|
|
var op = del[0];
|
|
if (!(op in threads))
|
|
threads[op] = [];
|
|
threads[op].push(del[1]);
|
|
}
|
|
else if (del < 0)
|
|
already_gone.push(-del);
|
|
else if (del)
|
|
winston.warn('Unknown del: ' + del);
|
|
});
|
|
if (already_gone.length)
|
|
winston.warn("Tried to delete missing posts: " +
|
|
already_gone);
|
|
if (_.isEmpty(threads))
|
|
return callback(null);
|
|
var m = self.connect().multi();
|
|
for (var op in threads) {
|
|
var nums = threads[op];
|
|
nums.sort();
|
|
var opts = {cacheUpdate: {nums: nums}};
|
|
self._log(m, op, common.DELETE_POSTS, nums, opts);
|
|
}
|
|
m.exec(callback);
|
|
}
|
|
};
|
|
|
|
Y.remove_thread = function (op, callback) {
|
|
if (OPs[op] != op)
|
|
return callback(Muggle('Thread does not exist.'));
|
|
var r = this.connect();
|
|
var key = 'thread:' + op, dead_key = 'dead:' + op;
|
|
var graveyardKey = 'tag:' + tag_key('graveyard');
|
|
var privs = null;
|
|
var etc = {cacheUpdate: {}};
|
|
var self = this;
|
|
async.waterfall([
|
|
function (next) {
|
|
get_all_replies_and_privs(r, op, next);
|
|
},
|
|
function (nums, threadPrivs, next) {
|
|
etc.cacheUpdate.nums = nums;
|
|
privs = threadPrivs;
|
|
if (!nums || !nums.length)
|
|
return next(null, []);
|
|
tail.map(nums, self.remove_post.bind(self, false), next);
|
|
},
|
|
function (dels, next) {
|
|
var m = r.multi();
|
|
m.incr(graveyardKey + ':bumpctr');
|
|
m.hgetall(key);
|
|
m.exec(next);
|
|
},
|
|
function (rs, next) {
|
|
var deadCtr = rs[0], post = rs[1];
|
|
var tags = parse_tags(post.tags);
|
|
var subject = subject_val(op, post.subject);
|
|
/* Rename thread keys, move to graveyard */
|
|
var m = r.multi();
|
|
var expiryKey = expiry_queue_key();
|
|
tags.forEach(function (tag) {
|
|
var tagKey = tag_key(tag);
|
|
m.zrem(expiryKey, op + ':' + tagKey);
|
|
m.zrem('tag:' + tagKey + ':threads', op);
|
|
if (subject)
|
|
m.zrem('tag:' + tagKey + ':subjects', subject);
|
|
});
|
|
m.zadd(graveyardKey + ':threads', deadCtr, op);
|
|
etc.tags = tags;
|
|
self._log(m, op, common.DELETE_THREAD, [], etc);
|
|
m.hset(key, 'hide', 1);
|
|
/* Next two vals are checked */
|
|
m.renamenx(key, dead_key);
|
|
m.renamenx(key + ':history', dead_key + ':history');
|
|
m.renamenx(key + ':ips', dead_key + ':ips');
|
|
m.exec(next);
|
|
},
|
|
function (results, done) {
|
|
var dels = results.slice(-2);
|
|
if (dels.some(function (x) { return x === 0; }))
|
|
return done("Already deleted?!");
|
|
delete OPs[op];
|
|
delete TAGS[op];
|
|
|
|
/* Extra renames now that we have renamenx exclusivity */
|
|
var m = r.multi();
|
|
m.rename(key + ':posts', dead_key + ':posts');
|
|
m.rename(key + ':links', dead_key + ':links');
|
|
if (privs.length) {
|
|
m.rename(key + ':privs', dead_key + ':privs');
|
|
privs.forEach(function (priv) {
|
|
var suff = ':privs:' + priv;
|
|
m.rename(key + suff, dead_key + suff);
|
|
});
|
|
}
|
|
m.exec(function (err) {
|
|
done(err, null); /* second arg is remove_posts dels */
|
|
});
|
|
/* Background, might not even be there */
|
|
self.finish_quietly(dead_key, warn);
|
|
self.hide_image(dead_key, warn);
|
|
}], callback);
|
|
};
|
|
|
|
Y.archive_thread = function (op, callback) {
|
|
var r = this.connect();
|
|
var key = 'thread:' + op, archiveKey = 'tag:' + tag_key('archive');
|
|
var self = this;
|
|
async.waterfall([
|
|
function (next) {
|
|
var m = r.multi();
|
|
m.exists(key);
|
|
m.hget(key, 'immortal');
|
|
m.zscore('tag:' + tag_key('graveyard') + ':threads', op);
|
|
m.exec(next);
|
|
},
|
|
function (rs, next) {
|
|
if (!rs[0])
|
|
return callback(Muggle(key + ' does not exist.'));
|
|
if (parse_number(rs[1]))
|
|
return callback(Muggle(key + ' is immortal.'));
|
|
if (rs[2])
|
|
return callback(Muggle(key + ' is already deleted.'));
|
|
var m = r.multi();
|
|
// order counts
|
|
m.hgetall(key);
|
|
m.hgetall(key + ':links');
|
|
m.llen(key + ':posts');
|
|
m.smembers(key + ':privs');
|
|
m.lrange(key + ':dels', 0, -1);
|
|
m.exec(next);
|
|
},
|
|
function (rs, next) {
|
|
var view = rs[0], links = rs[1], replyCount = rs[2],
|
|
privs = rs[3], dels = rs[4];
|
|
var subject = subject_val(op, view.subject);
|
|
var tags = view.tags;
|
|
var m = r.multi();
|
|
// move to archive tag only
|
|
m.hset(key, 'origTags', tags);
|
|
m.hset(key, 'tags', tag_key('archive'));
|
|
tags = parse_tags(tags);
|
|
tags.forEach(function (tag) {
|
|
var tagKey = 'tag:' + tag_key(tag);
|
|
m.zrem(tagKey + ':threads', op);
|
|
if (subject)
|
|
m.zrem(tagKey + ':subjects', subject);
|
|
});
|
|
m.zadd(archiveKey + ':threads', op, op);
|
|
self._log(m, op, common.DELETE_THREAD, [], {tags: tags});
|
|
|
|
// shallow thread insertion message in archive
|
|
if (!_.isEmpty(links))
|
|
view.links = links;
|
|
extract(view);
|
|
delete view.ip;
|
|
view.replyctr = replyCount;
|
|
view.hctr = 0;
|
|
var etc = {tags: ['archive'], cacheUpdate: {}};
|
|
self._log(m, op, common.MOVE_THREAD, [view], etc);
|
|
|
|
// clear history; note new history could be added
|
|
// for deletion in the archive
|
|
// (a bit silly right after adding a new entry)
|
|
m.hdel(key, 'hctr');
|
|
m.del(key + ':history');
|
|
m.del(key + ':ips');
|
|
|
|
// delete hidden posts
|
|
dels.forEach(function (num) {
|
|
m.del('post:' + num);
|
|
m.del('post:' + num + ':links');
|
|
});
|
|
m.del(key + ':dels');
|
|
|
|
if (privs.length) {
|
|
m.del(key + ':privs');
|
|
privs.forEach(function (priv) {
|
|
m.del(key + ':privs:' + priv);
|
|
});
|
|
}
|
|
|
|
m.exec(next);
|
|
},
|
|
function (results, done) {
|
|
set_OP_tag(config.BOARDS.indexOf('archive'), op);
|
|
done();
|
|
}], callback);
|
|
};
|
|
|
|
/* BOILERPLATE CITY */
|
|
|
|
Y.remove_images = function (nums, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
var threads = {};
|
|
var rem = this.remove_image.bind(this, threads);
|
|
var self = this;
|
|
tail.forEach(nums, rem, function (err) {
|
|
if (err)
|
|
return callback(err);
|
|
var m = self.connect().multi();
|
|
for (var op in threads)
|
|
self._log(m, op, common.DELETE_IMAGES, threads[op],
|
|
{tags: tags_of(op)});
|
|
m.exec(callback);
|
|
});
|
|
};
|
|
|
|
Y.remove_image = function (threads, num, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
var r = this.connect();
|
|
var op = OPs[num];
|
|
if (!op)
|
|
callback(null, false);
|
|
var key = (op == num ? 'thread:' : 'post:') + num;
|
|
var self = this;
|
|
r.hexists(key, 'src', function (err, exists) {
|
|
if (err)
|
|
return callback(err);
|
|
if (!exists)
|
|
return callback(null);
|
|
self.hide_image(key, function (err) {
|
|
if (err)
|
|
return callback(err);
|
|
r.hset(key, 'hideimg', 1, function (err, affected) {
|
|
if (err)
|
|
return callback(err);
|
|
if (!affected)
|
|
return callback(null);
|
|
|
|
if (threads[op])
|
|
threads[op].push(num);
|
|
else
|
|
threads[op] = [num];
|
|
r.hincrby('thread:' + op, 'imgctr', -1,
|
|
callback);
|
|
});
|
|
});
|
|
});
|
|
};
|
|
|
|
Y.hide_image = function (key, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
var r = this.connect();
|
|
var hash;
|
|
var imgKeys = ['hideimg', 'hash', 'src', 'thumb', 'realthumb', 'mid'];
|
|
r.hmget(key, imgKeys, move_dead);
|
|
|
|
function move_dead(err, rs) {
|
|
if (err)
|
|
return callback(err);
|
|
if (!rs)
|
|
return callback(null);
|
|
var info = {};
|
|
for (var i = 0; i < rs.length; i++)
|
|
info[imgKeys[i]] = rs[i];
|
|
if (info.hideimg) /* already gone */
|
|
return callback(null);
|
|
hooks.trigger("buryImage", info, callback);
|
|
}
|
|
};
|
|
|
|
Y.force_image_spoilers = function (nums, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
var threads = {};
|
|
var rem = this.spoiler_image.bind(this, threads);
|
|
var self = this;
|
|
tail.forEach(nums, rem, function (err) {
|
|
if (err)
|
|
return callback(err);
|
|
var m = self.connect().multi();
|
|
for (var op in threads)
|
|
self._log(m, op, common.SPOILER_IMAGES, threads[op],
|
|
{tags: tags_of(op)});
|
|
m.exec(callback);
|
|
});
|
|
};
|
|
|
|
Y.spoiler_image = function (threads, num, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
var r = this.connect();
|
|
var op = OPs[num];
|
|
if (!op)
|
|
callback(null, false);
|
|
var key = (op == num ? 'thread:' : 'post:') + num;
|
|
var self = this;
|
|
var spoilerKeys = ['src', 'spoiler', 'realthumb'];
|
|
r.hmget(key, spoilerKeys, function (err, info) {
|
|
if (err)
|
|
return callback(err);
|
|
/* no image or already spoilt */
|
|
if (!info[0] || info[1] || info[2])
|
|
return callback(null);
|
|
var index = common.pick_spoiler(-1).index;
|
|
r.hmset(key, 'spoiler', index, function (err) {
|
|
if (err)
|
|
return callback(err);
|
|
|
|
if (threads[op])
|
|
threads[op].push([num, index]);
|
|
else
|
|
threads[op] = [[num, index]];
|
|
callback(null);
|
|
});
|
|
});
|
|
};
|
|
|
|
Y.toggle_thread_lock = function (op, callback) {
|
|
if (this.ident.readOnly)
|
|
return callback(Muggle("Read-only right now."));
|
|
if (OPs[op] != op)
|
|
return callback(Muggle('Thread does not exist.'));
|
|
var r = this.connect();
|
|
var key = 'thread:' + op;
|
|
var self = this;
|
|
r.hexists(key, 'locked', function (err, locked) {
|
|
if (err)
|
|
return callback(err);
|
|
var m = r.multi();
|
|
if (locked)
|
|
m.hdel(key, 'locked');
|
|
else
|
|
m.hset(key, 'locked', '1');
|
|
var act = locked ? common.UNLOCK_THREAD : common.LOCK_THREAD;
|
|
self._log(m, op, act, []);
|
|
m.exec(callback);
|
|
});
|
|
};
|
|
|
|
/* END BOILERPLATE CITY */
|
|
|
|
function warn(err) {
|
|
if (err)
|
|
winston.warn('Warning: ' + err);
|
|
}
|
|
|
|
Y.check_thread_locked = function (op, callback) {
|
|
this.connect().hexists('thread:' + op, 'locked', function (err, lock) {
|
|
if (err)
|
|
callback(err);
|
|
else
|
|
callback(lock ? Muggle('Thread is locked.') : null);
|
|
});
|
|
};
|
|
|
|
Y.check_throttle = function (ip, callback) {
|
|
var key = 'ip:' + ip + ':throttle:thread';
|
|
this.connect().exists(key, function (err, exists) {
|
|
if (err)
|
|
callback(err);
|
|
else
|
|
callback(exists ? Muggle('Too soon.') : null);
|
|
});
|
|
};
|
|
|
|
Y.add_image = function (post, alloc, ip, callback) {
|
|
var r = this.connect();
|
|
var num = post.num, op = post.op;
|
|
if (!op)
|
|
return callback(Muggle("Can't add another image to an OP."));
|
|
var image = alloc.image;
|
|
if (!image.pinky)
|
|
return callback(Muggle("Image is wrong size."));
|
|
delete image.pinky;
|
|
|
|
var key = 'post:' + num;
|
|
r.exists(key, function (err, exists) {
|
|
if (err)
|
|
return callback(err);
|
|
if (!exists)
|
|
return callback(Muggle("Post does not exist."));
|
|
|
|
imager.commit_image_alloc(alloc, function (err) {
|
|
if (err)
|
|
return callback(err);
|
|
add_it();
|
|
});
|
|
});
|
|
|
|
var self = this;
|
|
function add_it() {
|
|
var m = r.multi();
|
|
imager.note_hash(image.hash, post.num);
|
|
// HACK: hmset doesn't like our array, but we need it
|
|
var orig_dims = image.dims;
|
|
image.dims = orig_dims.toString();
|
|
m.hmset(key, image);
|
|
image.dims = orig_dims;
|
|
|
|
m.hincrby('thread:' + op, 'imgctr', 1);
|
|
|
|
delete image.hash;
|
|
self._log(m, op, common.INSERT_IMAGE, [num, image]);
|
|
|
|
var now = Date.now();
|
|
var n = post_volume({image: true});
|
|
update_throughput(m, ip, now, post_volume({image: true}));
|
|
m.exec(callback);
|
|
}
|
|
};
|
|
|
|
Y.append_post = function (post, tail, old_state, extra, cb) {
|
|
var m = this.connect().multi();
|
|
var key = (post.op ? 'post:' : 'thread:') + post.num;
|
|
/* Don't need to check .exists() thanks to client state */
|
|
m.append(key + ':body', tail);
|
|
/* XXX: fragile */
|
|
if (old_state[0] != post.state[0] || old_state[1] != post.state[1])
|
|
m.hset(key, 'state', post.state.join());
|
|
if (extra.ip) {
|
|
var now = Date.now();
|
|
update_throughput(m, extra.ip, now, post_volume(null, tail));
|
|
}
|
|
if (!_.isEmpty(extra.new_links))
|
|
m.hmset(key + ':links', extra.new_links);
|
|
|
|
// possibly attach data for dice rolls etc. to the update
|
|
var attached = {post: post, extra: extra, writeKeys: {}, attach: {}};
|
|
var self = this;
|
|
hooks.trigger("attachToPost", attached, function (err, attached) {
|
|
if (err)
|
|
return cb(err);
|
|
for (var h in attached.writeKeys)
|
|
m.hset(key, h, attached.writeKeys[h]);
|
|
var msg = [post.num, tail];
|
|
var links = extra.links || {};
|
|
|
|
var a = old_state[0], b = old_state[1];
|
|
// message tail is [... a, b, links, attachment]
|
|
// default values [... 0, 0, {}, {}] don't need to be sent
|
|
// to minimize log output
|
|
if (!_.isEmpty(attached.attach))
|
|
msg.push(a, b, links, attached.attach);
|
|
else if (!_.isEmpty(links))
|
|
msg.push(a, b, links);
|
|
else if (b)
|
|
msg.push(a, b);
|
|
else if (a)
|
|
msg.push(a);
|
|
|
|
self._log(m, post.op || post.num, common.UPDATE_POST, msg);
|
|
m.exec(cb);
|
|
});
|
|
};
|
|
|
|
register_lua('finish');
|
|
|
|
function finish_off(m, key) {
|
|
var body_key = key.replace('dead', 'thread') + ':body';
|
|
m.evalsha(LUA.finish.sha, 3, key, body_key, 'liveposts');
|
|
}
|
|
|
|
Y.finish_post = function (post, callback) {
|
|
var m = this.connect().multi();
|
|
var key = (post.op ? 'post:' : 'thread:') + post.num;
|
|
/* Don't need to check .exists() thanks to client state */
|
|
finish_off(m, key);
|
|
this._log(m, post.op || post.num, common.FINISH_POST, [post.num]);
|
|
m.exec(callback);
|
|
};
|
|
|
|
Y.finish_quietly = function (key, callback) {
|
|
var m = this.connect().multi();
|
|
finish_off(m, key);
|
|
m.exec(callback);
|
|
};
|
|
|
|
Y.finish_all = function (callback) {
|
|
var r = this.connect();
|
|
var self = this;
|
|
r.smembers('liveposts', function (err, keys) {
|
|
if (err)
|
|
return callback(err);
|
|
async.forEach(keys, function (key, cb) {
|
|
var isPost = /^post:(\d+)$/.test(key);
|
|
if (isPost)
|
|
r.hget(key, 'op', fini);
|
|
else
|
|
fini();
|
|
|
|
function fini(err, op) {
|
|
if (err)
|
|
return cb(err);
|
|
var m = r.multi();
|
|
finish_off(m, key);
|
|
var n = parse_number(key.match(/:(\d+)$/)[1]);
|
|
op = isPost ? parse_number(op) : n;
|
|
self._log(m, op, common.FINISH_POST, [n]);
|
|
m.srem('liveposts', key);
|
|
m.exec(cb);
|
|
}
|
|
}, callback);
|
|
});
|
|
};
|
|
|
|
Y._log = function (m, op, kind, msg, opts) {
|
|
opts = opts || {};
|
|
msg = JSON.stringify(msg).slice(1, -1);
|
|
msg = msg.length ? (kind + ',' + msg) : ('' + kind);
|
|
winston.info("Log: " + msg);
|
|
if (!op)
|
|
throw new Error('No OP.');
|
|
var priv = this.ident.priv;
|
|
var prefix = priv ? ('priv:' + priv + ':') : '';
|
|
var key = prefix + 'thread:' + op;
|
|
|
|
if (common.is_pubsub(kind)) {
|
|
m.rpush(key + ':history', msg);
|
|
m.hincrby(key, 'hctr', 1);
|
|
}
|
|
if (opts.ipNum)
|
|
m.hset(key + ':ips', opts.ipNum, opts.ip);
|
|
|
|
var opBit = op + ',';
|
|
var len = opBit.length + msg.length;
|
|
msg = len + '|' + opBit + msg;
|
|
|
|
// we can add an extra trailing message for secret info
|
|
if (opts.ip)
|
|
msg += JSON.stringify({auth: {ip: opts.ip}});
|
|
|
|
m.publish(key, msg);
|
|
var tags = opts.tags || (this.tag ? [this.tag] : []);
|
|
tags.forEach(function (tag) {
|
|
m.publish(prefix + 'tag:' + tag, msg);
|
|
});
|
|
|
|
if (opts.cacheUpdate) {
|
|
var info = {kind: kind, tag: tags[0], op: op};
|
|
_.extend(info, opts.cacheUpdate);
|
|
m.publish('cache', JSON.stringify(info));
|
|
}
|
|
};
|
|
|
|
Y.fetch_backlogs = function (watching, callback) {
|
|
var r = this.connect();
|
|
var combined = [];
|
|
var inject_ips = caps.can_moderate(this.ident);
|
|
forEachInObject(watching, function (thread, cb) {
|
|
if (thread == 'live')
|
|
return cb(null);
|
|
var key = 'thread:' + thread;
|
|
var sync = watching[thread];
|
|
var m = r.multi();
|
|
m.lrange(key + ':history', sync, -1);
|
|
if (inject_ips) {
|
|
// would be nice to fetch only the relevant ips...?
|
|
m.hgetall(key + ':ips');
|
|
}
|
|
m.exec(function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
|
|
var prefix = thread + ',';
|
|
var ips = inject_ips && rs[1];
|
|
|
|
// construct full messages from history entries
|
|
rs[0].forEach(function (entry) {
|
|
|
|
// attempt to inject ip to INSERT_POST log
|
|
var m = ips && entry.match(/^2,(\d+),{(.+)$/);
|
|
var ip = m && ips[m[1]];
|
|
if (ip) {
|
|
var inject = '"ip":' + JSON.stringify(ip) + ',';
|
|
entry = '2,' + m[1] + ',{' + inject + m[2];
|
|
}
|
|
|
|
combined.push(prefix + entry);
|
|
});
|
|
|
|
cb(null);
|
|
});
|
|
}, function (errs) {
|
|
callback(errs, combined);
|
|
});
|
|
};
|
|
|
|
Y.get_post_op = function (num, callback) {
|
|
var r = this.connect();
|
|
r.hget('post:' + num, 'op', function (err, op) {
|
|
if (err)
|
|
return callback(err);
|
|
else if (op)
|
|
return callback(null, num, op);
|
|
r.exists('thread:' + num, function (err, exists) {
|
|
if (err)
|
|
callback(err);
|
|
else if (!exists)
|
|
callback(null, null, null);
|
|
else
|
|
callback(null, num, num);
|
|
});
|
|
});
|
|
}
|
|
|
|
Y.get_tag = function (page) {
|
|
var r = this.connect();
|
|
var self = this;
|
|
var key = 'tag:' + tag_key(this.tag) + ':threads';
|
|
var reverseOrder = this.tag == 'archive';
|
|
if (page < 0 && !reverseOrder)
|
|
page = 0;
|
|
var start = page * config.THREADS_PER_PAGE;
|
|
var end = start + config.THREADS_PER_PAGE - 1;
|
|
var m = r.multi();
|
|
if (reverseOrder)
|
|
m.zrange(key, start, end);
|
|
else
|
|
m.zrevrange(key, start, end);
|
|
m.zcard(key);
|
|
m.exec(function (err, res) {
|
|
if (err)
|
|
return self.emit('error', err);
|
|
var nums = res[0];
|
|
if (page > 0 && !nums.length)
|
|
return self.emit('nomatch');
|
|
if (reverseOrder)
|
|
nums.reverse();
|
|
self.emit('begin', res[1]);
|
|
var reader = new Reader(self);
|
|
reader.on('error', self.emit.bind(self, 'error'));
|
|
reader.on('thread', self.emit.bind(self, 'thread'));
|
|
reader.on('post', self.emit.bind(self, 'post'));
|
|
reader.on('endthread', self.emit.bind(self, 'endthread'));
|
|
self._get_each_thread(reader, 0, nums);
|
|
});
|
|
};
|
|
|
|
Y._get_each_thread = function (reader, ix, nums) {
|
|
if (!nums || ix >= nums.length) {
|
|
this.emit('end');
|
|
reader.removeAllListeners('endthread');
|
|
reader.removeAllListeners('end');
|
|
return;
|
|
}
|
|
var self = this;
|
|
var next_please = function () {
|
|
reader.removeListener('end', next_please);
|
|
reader.removeListener('nomatch', next_please);
|
|
self._get_each_thread(reader, ix+1, nums);
|
|
};
|
|
reader.on('end', next_please);
|
|
reader.on('nomatch', next_please);
|
|
reader.get_thread(this.tag, nums[ix], {
|
|
abbrev: config.ABBREVIATED_REPLIES || 5
|
|
});
|
|
};
|
|
|
|
/* LURKERS */
|
|
|
|
register_lua('get_thread');
|
|
|
|
function lua_get_thread(r, key, abbrev, cb) {
|
|
var body_key = key.replace('dead', 'thread') + ':body';
|
|
var posts_key = key + ':posts';
|
|
r.evalsha(LUA.get_thread.sha, 4, key, body_key, posts_key, 'liveposts', abbrev,
|
|
function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
if (!rs)
|
|
return cb(null);
|
|
if (rs.length != 4)
|
|
throw new Error('get_thread.lua unexpected output');
|
|
|
|
// activePosts is a hash of hashes; needs to be unbulked on two levels
|
|
var activeBulk = rs[2];
|
|
for (var i = 1; i < activeBulk.length; i += 2)
|
|
activeBulk[i] = unbulk(activeBulk[i]);
|
|
var active = unbulk(activeBulk);
|
|
|
|
cb(null, {
|
|
pre: unbulk(rs[0]),
|
|
replies: rs[1].map(parse_number),
|
|
active: active,
|
|
total: rs[3],
|
|
});
|
|
});
|
|
}
|
|
|
|
function Reader(yakusoku) {
|
|
events.EventEmitter.call(this);
|
|
this.y = yakusoku;
|
|
this.postCache = {};
|
|
if (caps.can_administrate(yakusoku.ident))
|
|
this.showPrivs = true;
|
|
}
|
|
|
|
util.inherits(Reader, events.EventEmitter);
|
|
exports.Reader = Reader;
|
|
|
|
Reader.prototype.get_thread = function (tag, num, opts) {
|
|
var r = this.y.connect();
|
|
var graveyard = (tag == 'graveyard');
|
|
if (graveyard)
|
|
opts.showDead = true;
|
|
var key = (graveyard ? 'dead:' : 'thread:') + num;
|
|
var abbrev = opts.abbrev || 0;
|
|
|
|
var self = this;
|
|
lua_get_thread(r, key, abbrev, function (err, result) {
|
|
if (err)
|
|
return self.emit('error', err);
|
|
if (!result || !result.pre || !result.pre.time) {
|
|
if (!opts.redirect)
|
|
return self.emit('nomatch');
|
|
r.hget('post:' + num, 'op',
|
|
function (err, op) {
|
|
if (err)
|
|
self.emit('error', err);
|
|
else if (!op)
|
|
self.emit('nomatch');
|
|
else
|
|
self.emit('redirect', op);
|
|
});
|
|
return;
|
|
}
|
|
var opPost = result.pre;
|
|
var total = result.total;
|
|
var nums = result.replies;
|
|
self.postCache = result.active;
|
|
|
|
var exists = self.is_visible(opPost, opts);
|
|
var tags = parse_tags(opPost.tags);
|
|
if (!graveyard && tags.indexOf(tag) < 0) {
|
|
if (opts.redirect) {
|
|
var op = OPs[num];
|
|
return self.emit('redirect', op || num, tags[0]);
|
|
}
|
|
else
|
|
exists = false;
|
|
}
|
|
if (!exists) {
|
|
self.emit('nomatch');
|
|
return;
|
|
}
|
|
self.emit('begin', opPost);
|
|
opPost.num = num;
|
|
refine_post(opPost);
|
|
|
|
var priv = self.y.ident.priv;
|
|
|
|
if (opts.showDead || priv) {
|
|
var m = r.multi();
|
|
// order is important!
|
|
if (opts.showDead) {
|
|
var deadKey = key + ':dels';
|
|
m.lrange(deadKey, -abbrev, -1);
|
|
if (abbrev)
|
|
m.llen(deadKey);
|
|
}
|
|
if (priv) {
|
|
var privsKey = key + ':privs:' + priv;
|
|
m.lrange(privsKey, -abbrev, -1);
|
|
if (abbrev)
|
|
m.llen(privsKey);
|
|
}
|
|
|
|
m.exec(prepared);
|
|
}
|
|
else
|
|
prepared();
|
|
|
|
function prepared(err, rs) {
|
|
if (err)
|
|
return self.emit('error', err);
|
|
// get results in the same order as before
|
|
var deadNums, privNums;
|
|
if (opts.showDead) {
|
|
deadNums = rs.shift();
|
|
if (abbrev)
|
|
total += parse_number(rs.shift());
|
|
}
|
|
if (priv) {
|
|
privNums = rs.shift();
|
|
if (abbrev)
|
|
total += parse_number(rs.shift());
|
|
}
|
|
|
|
if (deadNums)
|
|
nums = merge_posts(nums, deadNums, abbrev);
|
|
if (priv) {
|
|
nums = merge_posts(nums, privNums, abbrev);
|
|
if (self.showPrivs)
|
|
self.privNums = privNums;
|
|
}
|
|
var omit = Math.max(total - abbrev, 0);
|
|
self.emit('thread', opPost, omit);
|
|
self._get_each_reply(0, nums, opts);
|
|
}
|
|
});
|
|
};
|
|
|
|
function merge_posts(nums, privNums, abbrev) {
|
|
var i = nums.length - 1, pi = privNums.length - 1;
|
|
if (pi < 0)
|
|
return nums;
|
|
var merged = [];
|
|
while (!abbrev || merged.length < abbrev) {
|
|
if (i >= 0 && pi >= 0) {
|
|
var num = nums[i], pNum = privNums[pi];
|
|
if (parse_number(num) > parse_number(pNum)) {
|
|
merged.unshift(num);
|
|
i--;
|
|
}
|
|
else {
|
|
merged.unshift(pNum);
|
|
pi--;
|
|
}
|
|
}
|
|
else if (i >= 0)
|
|
merged.unshift(nums[i--]);
|
|
else if (pi >= 0)
|
|
merged.unshift(privNums[pi--]);
|
|
else
|
|
break;
|
|
}
|
|
return merged;
|
|
}
|
|
|
|
function can_see_priv(priv, ident) {
|
|
if (!priv)
|
|
return true; // not private
|
|
if (!ident)
|
|
return false;
|
|
if (ident.showPriv)
|
|
return true;
|
|
return priv == ident.priv;
|
|
}
|
|
|
|
Reader.prototype._get_each_reply = function (ix, nums, opts) {
|
|
var cache = this.postCache;
|
|
var limit = 20;
|
|
|
|
var privs = this.privNums;
|
|
function apply_privs(post) {
|
|
if (privs && post.num && _.contains(privs, post.num.toString()))
|
|
post.priv = true;
|
|
}
|
|
|
|
// find a run of posts that need to be fetched
|
|
var end;
|
|
for (end = ix; end < nums.length && (end - ix) < limit; end++) {
|
|
if (cache[nums[end]])
|
|
break;
|
|
}
|
|
if (ix < end) {
|
|
// fetch posts in the ix..end range
|
|
var range = [];
|
|
for (var i = ix; i < end; i++)
|
|
range.push(nums[i]);
|
|
var self = this;
|
|
this.get_posts('post', range, opts, function (err, posts) {
|
|
if (err)
|
|
return self.emit('error', err);
|
|
for (var i = 0; i < posts.length; i++) {
|
|
var post = posts[i];
|
|
apply_privs(post);
|
|
self.emit('post', post);
|
|
}
|
|
process.nextTick(self._get_each_reply.bind(self, end, nums, opts));
|
|
});
|
|
return;
|
|
}
|
|
|
|
// otherwise read posts from cache
|
|
for (; ix < nums.length; ix++) {
|
|
var num = nums[ix];
|
|
var post = cache[num];
|
|
if (!post)
|
|
break;
|
|
|
|
if (this.is_visible(post, opts)) {
|
|
post.num = num;
|
|
refine_post(post);
|
|
apply_privs(post);
|
|
this.emit('post', post);
|
|
}
|
|
}
|
|
|
|
if (ix < nums.length) {
|
|
process.nextTick(this._get_each_reply.bind(this, ix, nums, opts));
|
|
} else {
|
|
this.emit('endthread');
|
|
this.emit('end');
|
|
}
|
|
};
|
|
|
|
/// fetch posts in bulk. it is assumed that none of them are currently being edited
|
|
Reader.prototype.get_posts = function (kind, nums, opts, cb) {
|
|
if (!nums.length)
|
|
return cb(null, []);
|
|
var r = this.y.connect();
|
|
var self = this;
|
|
|
|
var m = r.multi();
|
|
for (var i = 0; i < nums.length; i++) {
|
|
var key = kind + ':' + nums[i];
|
|
m.hgetall(key);
|
|
}
|
|
m.exec(function (err, results) {
|
|
if (err)
|
|
return cb(err);
|
|
|
|
var posts = [];
|
|
for (var i = 0; i < results.length; i++) {
|
|
var post = results[i];
|
|
var num = nums[i];
|
|
post.num = num;
|
|
if (!self.is_visible(post, opts))
|
|
continue;
|
|
|
|
refine_post(post);
|
|
if (post.editing && !post.body) {
|
|
post.body = '[a bug ate this post]';
|
|
|
|
var key = kind + ':' + num + ':body';
|
|
r.exists(key, function (err, existed) {
|
|
if (err)
|
|
winston.warn(err);
|
|
else if (existed)
|
|
winston.warn(key + " was overlooked");
|
|
});
|
|
}
|
|
posts.push(post);
|
|
}
|
|
|
|
cb(null, posts);
|
|
});
|
|
};
|
|
|
|
Reader.prototype.is_visible = function (post, opts) {
|
|
if (_.isEmpty(post))
|
|
return false;
|
|
if (post.hide && !opts.showDead)
|
|
return false;
|
|
if (!can_see_priv(post.priv, this.ident))
|
|
return false;
|
|
return true;
|
|
};
|
|
|
|
/// turn a fresh-from-redis post hash into our expected format
|
|
function refine_post(post) {
|
|
post.time = parse_number(post.time);
|
|
if (typeof post.op == 'string')
|
|
post.op = parse_number(post.op);
|
|
if (typeof post.tags == 'string')
|
|
post.tags = parse_tags(post.tags);
|
|
if (typeof post.body != 'string')
|
|
post.body = '';
|
|
if (post.state)
|
|
post.editing = true;
|
|
// extract the image-specific keys (if any) to a sub-hash
|
|
extract(post);
|
|
}
|
|
|
|
function parse_number(n) {
|
|
return parseInt(n, 10);
|
|
}
|
|
|
|
/* Including hidden or private. Not in-order. */
|
|
function get_all_replies_and_privs(r, op, cb) {
|
|
var key = 'thread:' + op;
|
|
var m = r.multi();
|
|
m.lrange(key + ':posts', 0, -1);
|
|
m.smembers(key + ':privs');
|
|
m.exec(function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
var nums = rs[0], privs = rs[1];
|
|
if (!privs.length)
|
|
return cb(null, nums, privs);
|
|
var m = r.multi();
|
|
privs.forEach(function (priv) {
|
|
m.lrange(key + ':privs:' + priv, 0, -1);
|
|
});
|
|
m.exec(function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
rs.forEach(function (ns) {
|
|
nums.push.apply(nums, ns);
|
|
});
|
|
cb(null, nums, privs);
|
|
});
|
|
});
|
|
};
|
|
|
|
|
|
/* AUTHORITY */
|
|
|
|
function Filter(tag) {
|
|
events.EventEmitter.call(this);
|
|
this.tag = tag;
|
|
};
|
|
|
|
util.inherits(Filter, events.EventEmitter);
|
|
exports.Filter = Filter;
|
|
var F = Filter.prototype;
|
|
|
|
F.connect = function () {
|
|
if (!this.r) {
|
|
this.r = global.redis;
|
|
}
|
|
return this.r;
|
|
};
|
|
|
|
F.get_all = function (limit) {
|
|
var self = this;
|
|
var r = this.connect();
|
|
r.zrange('tag:' + tag_key(this.tag) + ':threads', 0, -1, go);
|
|
function go(err, threads) {
|
|
if (err)
|
|
return self.failure(err);
|
|
async.forEach(threads, do_thread, self.check_done.bind(self));
|
|
}
|
|
function do_thread(op, cb) {
|
|
var key = 'thread:' + op;
|
|
r.llen(key + ':posts', function (err, len) {
|
|
if (err)
|
|
cb(err);
|
|
len = parse_number(len);
|
|
if (len > limit)
|
|
return cb(null);
|
|
var thumbKeys = ['thumb', 'realthumb', 'src'];
|
|
r.hmget(key, thumbKeys, function (err, rs) {
|
|
if (err)
|
|
cb(err);
|
|
var thumb = rs[0] || rs[1] || rs[2];
|
|
self.emit('thread', {num: op, thumb: thumb});
|
|
cb(null);
|
|
});
|
|
});
|
|
}
|
|
};
|
|
|
|
F.check_done = function (err) {
|
|
if (err)
|
|
this.failure(err);
|
|
else
|
|
this.success();
|
|
};
|
|
|
|
F.success = function () {
|
|
this.emit('end');
|
|
this.cleanup();
|
|
};
|
|
|
|
F.failure = function (err) {
|
|
this.emit('error', err);
|
|
this.cleanup();
|
|
};
|
|
|
|
F.cleanup = function () {
|
|
this.removeAllListeners('error');
|
|
this.removeAllListeners('thread');
|
|
this.removeAllListeners('end');
|
|
};
|
|
|
|
/* AMUSEMENT */
|
|
|
|
Y.get_fun = function (op, callback) {
|
|
if (cache.funThread && op == cache.funThread) {
|
|
/* Don't cache, for extra fun */
|
|
fs.readFile('client/fun.js', 'UTF-8', callback);
|
|
}
|
|
else
|
|
callback(null);
|
|
};
|
|
|
|
Y.set_fun_thread = function (op, callback) {
|
|
if (OPs[op] != op)
|
|
return callback(Muggle("Thread not found."));
|
|
var self = this;
|
|
fs.readFile('client/fun.js', 'UTF-8', function (err, funJs) {
|
|
if (err)
|
|
return callback(err);
|
|
cache.funThread = op;
|
|
var m = self.connect().multi();
|
|
self._log(m, op, common.EXECUTE_JS, [funJs]);
|
|
m.exec(callback);
|
|
});
|
|
};
|
|
|
|
Y.get_banner = function (cb) {
|
|
var key = 'tag:' + tag_key(this.tag) + ':banner';
|
|
this.connect().hgetall(key, cb);
|
|
};
|
|
|
|
Y.set_banner = function (op, message, cb) {
|
|
var r = this.connect();
|
|
|
|
var key = 'tag:' + tag_key(this.tag) + ':banner';
|
|
var self = this;
|
|
r.hgetall(key, function (err, old) {
|
|
if (err)
|
|
return cb(err);
|
|
var m = r.multi();
|
|
if (old && old.op != op) {
|
|
// clear previous thread's banner
|
|
self._log(m, old.op, common.UPDATE_BANNER, [null]);
|
|
}
|
|
|
|
// write new banner
|
|
m.hmset(key, {op: op, msg: message});
|
|
self._log(m, op, common.UPDATE_BANNER, [message]);
|
|
m.exec(cb);
|
|
});
|
|
};
|
|
|
|
Y.teardown = function (board, cb) {
|
|
var m = this.connect().multi();
|
|
var filter = new Filter(board);
|
|
var self = this;
|
|
filter.get_all(NaN); // no length limit
|
|
filter.on('thread', function (thread) {
|
|
self._log(m, thread.num, common.TEARDOWN, []);
|
|
});
|
|
filter.on('error', cb);
|
|
filter.on('end', function () {
|
|
m.exec(cb);
|
|
});
|
|
};
|
|
|
|
Y.get_current_body = function (num, cb) {
|
|
var key = (OPs[num] == num ? 'thread:' : 'post:') + num;
|
|
var m = this.connect().multi();
|
|
m.hmget(key, 'hide', 'body');
|
|
m.get(key + ':body');
|
|
m.exec(function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
var hide = rs[0][0], finalBody = rs[0][1];
|
|
var liveBody = rs[1];
|
|
if (hide)
|
|
return cb(null);
|
|
if (finalBody)
|
|
return cb(null, finalBody, true);
|
|
cb(null, liveBody || '', false);
|
|
});
|
|
};
|
|
|
|
/* HELPERS */
|
|
|
|
function extract(post) {
|
|
hooks.trigger_sync('extractPost', post);
|
|
}
|
|
|
|
function with_body(r, key, post, callback) {
|
|
if (post.body !== undefined)
|
|
callback(null, post);
|
|
else
|
|
r.get(key + ':body', function (err, body) {
|
|
if (err)
|
|
return callback(err);
|
|
if (body !== null) {
|
|
post.body = body;
|
|
post.editing = true;
|
|
return callback(null, post);
|
|
}
|
|
// Race condition between finishing posts
|
|
r.hget(key, 'body', function (err, body) {
|
|
if (err)
|
|
return callback(err);
|
|
post.body = body || '';
|
|
callback(null, post);
|
|
});
|
|
});
|
|
};
|
|
|
|
function subject_val(op, subject) {
|
|
return subject && (op + ':' + subject);
|
|
}
|
|
|
|
function tag_key(tag) {
|
|
return tag.length + ':' + tag;
|
|
}
|
|
|
|
function parse_tags(input) {
|
|
if (!input) {
|
|
winston.warn('Blank tag!');
|
|
return [];
|
|
}
|
|
var tags = [];
|
|
while (input.length) {
|
|
var m = input.match(/^(\d+):/);
|
|
if (!m)
|
|
break;
|
|
var len = parse_number(m[1]);
|
|
var pre = m[1].length + 1;
|
|
if (input.length < pre + len)
|
|
break;
|
|
tags.push(input.substr(pre, len));
|
|
input = input.slice(pre + len);
|
|
}
|
|
return tags;
|
|
}
|
|
exports.parse_tags = parse_tags;
|
|
|
|
function hmget_obj(r, key, keys, cb) {
|
|
r.hmget(key, keys, function (err, rs) {
|
|
if (err)
|
|
return cb(err);
|
|
var result = {};
|
|
for (var i = 0; i < keys.length; i++)
|
|
result[keys[i]] = rs[i];
|
|
cb(null, result);
|
|
});
|
|
}
|
|
|
|
/// converts a lua bulk response to a hash
|
|
function unbulk(list) {
|
|
if (!list)
|
|
return null;
|
|
if (list.length % 2) {
|
|
console.warn('bad bulk:', list);
|
|
throw new Error('bulk of odd len ' + list.length);
|
|
}
|
|
var hash = {};
|
|
for (var i = 0; i < list.length; i += 2) {
|
|
var key = list[i];
|
|
if (key in hash)
|
|
throw new Error('bulk repeated key ' + key);
|
|
hash[key] = list[i+1];
|
|
}
|
|
return hash;
|
|
}
|