var _ = require('./lib/underscore'), async = require('async'), cache = require('./server/state').dbCache, caps = require('./server/caps'), common = require('./common'), config = require('./config'), events = require('events'), fs = require('fs'), hooks = require('./hooks'), ipUtils = require('ip'), Muggle = require('./etc').Muggle, tail = require('./tail'), util = require('util'), winston = require('winston'); var imager = require('./imager'); /* set up hooks */ var OPs = exports.OPs = cache.OPs; var TAGS = exports.TAGS = cache.opTags; var SUBS = exports.SUBS = cache.threadSubs; var LUA = {}; function register_lua(name) { var src = fs.readFileSync('lua/' + name + '.lua', 'UTF-8'); LUA[name] = {src: src}; } function redis_client() { var conn = require('redis').createClient(config.REDIS_PORT || undefined); // ASYNC SETUP RACE! for (var k in LUA) load(LUA[k]); function load(entry) { conn.script('load', entry.src, function (err, sha) { if (err) throw err; entry.sha = sha; }); } return conn; } exports.redis_client = redis_client; // wait for the `register_lua` calls before connecting process.nextTick(function () { global.redis = redis_client(); }); /* REAL-TIME UPDATES */ function Subscription(targetInfo) { events.EventEmitter.call(this); this.setMaxListeners(0); this.fullKey = targetInfo.key; this.target = targetInfo.target; this.channel = targetInfo.channel; SUBS[this.fullKey] = this; this.pending_subscriptions = []; this.subscription_callbacks = []; this.k = redis_client(); this.k.on('error', this.on_sub_error.bind(this)); this.k.on('subscribe', this.on_one_sub.bind(this)); this.k.subscribe(this.target); this.subscriptions = [this.target]; this.pending_subscriptions.push(this.target); if (this.target != this.fullKey) { this.k.subscribe(this.fullKey); this.pending_subscriptions.push(this.fullKey); } }; util.inherits(Subscription, events.EventEmitter); var S = Subscription.prototype; Subscription.full_key = function (target, ident) { var channel; if (ident && ident.priv) channel = 'priv:' + ident.priv; else if (caps.can_moderate(ident)) channel = 'auth'; var key = channel ? channel + ':' + target : target; return {key: key, channel: channel, target: target}; }; Subscription.get = function (target, ident) { var full = Subscription.full_key(target, ident); var sub = SUBS[full.key]; if (!sub) sub = new Subscription(full); return sub; }; S.when_ready = function (cb) { if (this.subscription_callbacks) this.subscription_callbacks.push(cb); else cb(null); }; S.on_one_sub = function (name) { var i = this.pending_subscriptions.indexOf(name); if (i < 0) throw "Obtained unasked-for subscription " + name + "?!"; this.pending_subscriptions.splice(i, 1); if (this.pending_subscriptions.length == 0) this.on_all_subs(); }; S.on_all_subs = function () { var k = this.k; k.removeAllListeners('subscribe'); k.on('message', this.on_message.bind(this)); k.removeAllListeners('error'); k.on('error', this.sink_sub.bind(this)); this.subscription_callbacks.forEach(function (cb) { cb(null); }); delete this.pending_subscriptions; delete this.subscription_callbacks; }; function parse_pub_message(msg) { var m = msg.match(/^(\d+)\|/); var prefixLen = m[0].length; var bodyLen = parse_number(m[1]); var info = {body: msg.substr(prefixLen, bodyLen)}; var suffixPos = prefixLen + bodyLen; if (msg.length > suffixPos) info.suffixPos = suffixPos; return info; } S.on_message = function (chan, msg) { /* Do we need to clarify whether this came from target or fullKey? */ var parsed = parse_pub_message(msg), extra; if (this.channel && parsed.suffixPos) { var suffix = JSON.parse(msg.slice(parsed.suffixPos)); extra = suffix[this.channel]; } msg = parsed.body; var m = msg.match(/^(\d+),(\d+)/); if(m===null) return; var op = parse_number(m[1]); var kind = parse_number(m[2]); if (extra && kind == common.INSERT_POST) { // add ip to INSERT_POST var m = msg.match(/^(\d+,2,\d+,{)(.+)$/); if (m && extra.ip) { if (/"ip":/.test(msg)) throw "`ip` in public pub " + chan; msg = m[1] + '"ip":' + JSON.stringify(extra.ip) + ',' + m[2]; } } this.emit('update', op, kind, '[[' + msg + ']]'); }; S.on_sub_error = function (err) { winston.error("Subscription error:", (err.stack || err)); this.commit_sudoku(); this.subscription_callbacks.forEach(function (cb) { cb(err); }); this.subscription_callbacks = null; }; S.sink_sub = function (err) { if (config.DEBUG) throw err; this.emit('error', this.target, err); this.commit_sudoku(); }; S.commit_sudoku = function () { var k = this.k; k.removeAllListeners('error'); k.removeAllListeners('message'); k.removeAllListeners('subscribe'); k.quit(); if (SUBS[this.fullKey] === this) delete SUBS[this.fullKey]; this.removeAllListeners('update'); this.removeAllListeners('error'); }; S.has_no_listeners = function () { /* Possibly idle out after a while */ var self = this; if (this.idleOutTimer) clearTimeout(this.idleOutTimer); this.idleOutTimer = setTimeout(function () { self.idleOutTimer = null; if (self.listeners('update').length == 0) self.commit_sudoku(); }, 30 * 1000); }; /* OP CACHE */ function add_OP_tag(tagIndex, op) { var tags = TAGS[op]; if (tags === undefined) TAGS[op] = tagIndex; else if (typeof tags == 'number') { if (tagIndex != tags) TAGS[op] = [tags, tagIndex]; } else if (tags.indexOf(tagIndex) < 0) tags.push(tagIndex); } function set_OP_tag(tagIndex, op) { TAGS[op] = tagIndex; } function OP_has_tag(tag, op) { var index = config.BOARDS.indexOf(tag); if (index < 0) return false; var tags = TAGS[op]; if (tags === undefined) return false; if (typeof tags == 'number') return index == tags; else return tags.indexOf(index) >= 0; }; exports.OP_has_tag = OP_has_tag; exports.first_tag_of = function (op) { var tags = TAGS[op]; if (tags === undefined) return false; else if (typeof tags == 'number') return config.BOARDS[tags]; else return config.BOARDS[tags[0]]; }; function tags_of(op) { var tags = TAGS[op]; if (tags === undefined) return false; else if (typeof tags == 'number') return [config.BOARDS[tags]]; else return tags.map(function (i) { return config.BOARDS[i]; }); } exports.tags_of = tags_of; function update_cache(chan, msg) { msg = JSON.parse(msg); var op = msg.op, kind = msg.kind, tag = msg.tag; if (kind == common.INSERT_POST) { if (msg.num) OPs[msg.num] = op; else { add_OP_tag(config.BOARDS.indexOf(tag), op); OPs[op] = op; } } else if (kind == common.MOVE_THREAD) { set_OP_tag(config.BOARDS.indexOf(tag), op); } else if (kind == common.DELETE_POSTS) { msg.nums.forEach(function (num) { delete OPs[num]; }); } else if (kind == common.DELETE_THREAD) { msg.nums.forEach(function (num) { delete OPs[num]; }); delete TAGS[op]; } } exports.track_OPs = function (callback) { var k = redis_client(); k.subscribe('cache'); k.once('subscribe', function () { load_OPs(callback); }); k.on('message', update_cache); /* k persists for the purpose of cache updates */ }; exports.on_pub = function (name, handler) { // TODO: share redis connection var k = redis_client(); k.subscribe(name); k.on('message', handler); /* k persists */ }; function load_OPs(callback) { var r = global.redis; var boards = config.BOARDS; // Want consistent ordering in the TAGS entries for multi-tag threads // (so do them in series) tail.forEach(boards, scan_board, callback); var threadsKey; function scan_board(tag, cb) { var tagIndex = boards.indexOf(tag); threadsKey = 'tag:' + tag_key(tag) + ':threads'; r.zrange(threadsKey, 0, -1, function (err, threads) { if (err) return cb(err); async.forEach(threads, function (op, cb) { op = parse_number(op); var ps = [scan_thread.bind(null,tagIndex,op)]; if (!config.READ_ONLY && config.THREAD_EXPIRY && tag != 'archive') { ps.push(refresh_expiry.bind(null, tag, op)); } async.parallel(ps, cb); }, cb); }); } function scan_thread(tagIndex, op, cb) { op = parse_number(op); add_OP_tag(tagIndex, op); OPs[op] = op; get_all_replies_and_privs(r, op, function (err, posts) { if (err) return cb(err); posts.forEach(function (num) { OPs[parse_number(num)] = op; }); cb(null); }); } var expiryKey = expiry_queue_key(); function refresh_expiry(tag, op, cb) { if (tag == config.STAFF_BOARD) return cb(null); var entry = op + ':' + tag_key(tag); var queries = ['time', 'immortal']; hmget_obj(r, 'thread:'+op, queries, function (err, thread) { if (err) return cb(err); if (!thread.time) { winston.warn('Thread '+op+" doesn't exist."); var m = r.multi(); m.zrem(threadsKey, op); m.zrem(expiryKey, entry); m.exec(cb); return; } if (thread.immortal) return r.zrem(expiryKey, entry, cb); var score = expiry_queue_score(thread.time); r.zadd(expiryKey, score, entry, cb); }); } } function expiry_queue_score(time) { return Math.floor(parse_number(time)/1000 + config.THREAD_EXPIRY); } function expiry_queue_key() { return 'expiry:' + config.THREAD_EXPIRY; } exports.expiry_queue_key = expiry_queue_key; /* SOCIETY */ exports.is_board = function (board) { return config.BOARDS.indexOf(board) >= 0; }; exports.UPKEEP_IDENT = {auth: 'Upkeep', ip: '127.0.0.1'}; function Yakusoku(board, ident) { events.EventEmitter.call(this); this.id = ++(cache.YAKUMAN); this.tag = board; this.ident = ident; this.subs = []; } util.inherits(Yakusoku, events.EventEmitter); exports.Yakusoku = Yakusoku; var Y = Yakusoku.prototype; Y.connect = function () { // multiple redis connections are pointless (without slaves) return global.redis; }; Y.disconnect = function () { this.removeAllListeners('end'); }; function forEachInObject(obj, f, callback) { var total = 0, complete = 0, done = false, errors = []; function cb(err) { complete++; if (err) errors.push(err); if (done && complete == total) callback(errors.length ? errors : null); } for (var k in obj) { if (obj.hasOwnProperty(k)) { total++; f(k, cb); } } done = true; if (complete == total) callback(errors.length ? errors : null); } Y.target_key = function (id) { return (id == 'live') ? 'tag:' + this.tag : 'thread:' + id; }; Y.kiku = function (targets, on_update, on_sink, callback) { var self = this; this.on_update = on_update; this.on_sink = on_sink; forEachInObject(targets, function (id, cb) { var target = self.target_key(id); var sub = Subscription.get(target, self.ident); sub.on('update', on_update); sub.on('error', on_sink); self.subs.push(sub.fullKey); sub.when_ready(cb); }, callback); }; Y.kikanai = function () { var self = this; this.subs.forEach(function (key) { var sub = SUBS[key]; if (sub) { sub.removeListener('update', self.on_update); sub.removeListener('error', self.on_sink); if (sub.listeners('update').length == 0) sub.has_no_listeners(); } }); this.subs = []; }; function post_volume(view, body) { return (body ? body.length : 0) + (view ? (config.NEW_POST_WORTH || 0) : 0) + ((view && view.image) ? (config.IMAGE_WORTH || 0) : 0); } function update_throughput(m, ip, when, quant) { var key = 'ip:' + ip + ':throttle:'; var shortKey = key + short_term_timeslot(when); var longKey = key + long_term_timeslot(when); m.incrby(shortKey, quant); m.incrby(longKey, quant); /* Don't want to use expireat in case of timezone trickery or something dumb. (Really, UTC should be OK though...) */ // Conservative expirations m.expire(shortKey, 10 * 60); m.expire(longKey, 2 * 24 * 3600); } function short_term_timeslot(when) { return Math.floor(when / (1000 * 60 * 5)); } function long_term_timeslot(when) { return Math.floor(when / (1000 * 60 * 60 * 24)); } Y.reserve_post = function (op, ip, callback) { if (this.ident.readOnly) return callback(Muggle("Can't post right now.")); var r = this.connect(); if (ipUtils.isLoopback(ip)) return reserve(); var key = 'ip:' + ip + ':throttle:'; var now = Date.now(); var shortTerm = key + short_term_timeslot(now); var longTerm = key + long_term_timeslot(now); r.mget([shortTerm, longTerm], function (err, quants) { if (err) return callback(Muggle("Limiter failure.", err)); if (quants[0] > config.SHORT_TERM_LIMIT || quants[1] > config.LONG_TERM_LIMIT) return callback(Muggle('Reduce your speed.')); reserve(); }); function reserve() { r.incr('postctr', function (err, num) { if (err) return callback(err); OPs[num] = op || num; callback(null, num); }); } }; var optPostFields = 'name trip email auth subject flavor'.split(' '); Y.insert_post = function (msg, body, extra, callback) { if (this.ident.readOnly) return callback(Muggle("Can't post right now.")); var r = this.connect(); if (!this.tag) return callback(Muggle("Can't retrieve board for posting.")); var ip = extra.ip, board = extra.board, num = msg.num, op = msg.op; if (!num) return callback(Muggle("No post num.")); else if (!ip) return callback(Muggle("No IP.")); else if (op) { if (OPs[op] != op || !OP_has_tag(board, op)) { delete OPs[num]; return callback(Muggle('Thread does not exist.')); } } var view = {time: msg.time, ip: ip, state: msg.state.join()}; optPostFields.forEach(function (field) { if (msg[field]) view[field] = msg[field]; }); var tagKey = 'tag:' + tag_key(this.tag); if (op) view.op = op; else { view.tags = tag_key(board); if (board == config.STAFF_BOARD) view.immortal = 1; } if (extra.image_alloc) { msg.image = extra.image_alloc.image; if (!op == msg.image.pinky) return callback(Muggle("Image is the wrong size.")); delete msg.image.pinky; } var key = (op ? 'post:' : 'thread:') + num; var bump = !op || !common.is_sage(view.email); var m = r.multi(); m.incr(tagKey + ':postctr'); // must be first if (op) m.hget('thread:' + op, 'subject'); // must be second if (bump) m.incr(tagKey + ':bumpctr'); m.sadd('liveposts', key); hooks.trigger_sync('inlinePost', {src: msg, dest: view}); if (msg.image) { if (op) m.hincrby('thread:' + op, 'imgctr', 1); else view.imgctr = 1; imager.note_hash(msg.image.hash, msg.num); view.dims = view.dims.toString(); } m.hmset(key, view); m.set(key + ':body', body); if (msg.links) m.hmset(key + ':links', msg.links); var etc = {cacheUpdate: {}}; var priv = this.ident.priv; if (op) { etc.ipNum = num; etc.cacheUpdate.num = num; var pre = 'thread:' + op; if (priv) { m.sadd(pre + ':privs', priv); m.rpush(pre + ':privs:' + priv, num); } else m.rpush(pre + ':posts', num); } else { // TODO: Add to alternate thread list? // set conditional hide? op = num; if (!view.immortal) { var score = expiry_queue_score(msg.time); var entry = num + ':' + tag_key(this.tag); m.zadd(expiry_queue_key(), score, entry); } /* Rate-limit new threads */ if (ipUtils.isLoopback(ip)) m.setex('ip:'+ip+':throttle:thread', config.THREAD_THROTTLE, op); } /* Denormalize for backlog */ view.nonce = msg.nonce; view.body = body; if (msg.links) view.links = msg.links; extract(view); delete view.ip; var self = this; async.waterfall([ function (next) { if (!msg.image) return next(null); imager.commit_image_alloc(extra.image_alloc, next); }, function (next) { if (ip) { var n = post_volume(view, body); if (n > 0) update_throughput(m, ip, view.time, n); etc.ip = ip; } self._log(m, op, common.INSERT_POST, [num, view], etc); m.exec(next); }, function (results, next) { if (!bump) return next(); var postctr = results[0]; var subject = subject_val(op, op==num ? view.subject : results[1]); var m = r.multi(); m.zadd(tagKey + ':threads', postctr, op); if (subject) m.zadd(tagKey + ':subjects', postctr, subject); m.exec(next); }], function (err) { if (err) { delete OPs[num]; return callback(err); } callback(null); }); }; Y.remove_post = function (from_thread, num, callback) { num = parse_number(num); var op = OPs[num]; if (!op) return callback(Muggle('No such post.')); if (op == num) { if (!from_thread) return callback('Deletion loop?!'); return this.remove_thread(num, callback); } var r = this.connect(); var self = this; if (from_thread) { var key = 'thread:' + op; r.lrem(key + ':posts', -1, num, function (err, delCount) { if (err) return callback(err); /* did someone else already delete this? */ if (delCount != 1) return callback(null, -num); /* record deletion */ r.rpush(key + ':dels', num, function (err) { if (err) winston.warn(err); gone_from_thread(); }); }); } else gone_from_thread(); function gone_from_thread() { var key = 'post:' + num; r.hset(key, 'hide', '1', function (err) { if (err) { /* Difficult to recover. Whatever. */ winston.warn("Couldn't hide: " + err); } /* TODO push cache update? */ delete OPs[num]; callback(null, [op, num]); /* In the background, try to finish the post */ self.finish_quietly(key, warn); self.hide_image(key, warn); }); } }; Y.remove_posts = function (nums, callback) { var self = this; tail.map(nums, this.remove_post.bind(this, true), all_gone); function all_gone(err, dels) { if (err) return callback(err); var threads = {}, already_gone = []; dels.forEach(function (del) { if (Array.isArray(del)) { var op = del[0]; if (!(op in threads)) threads[op] = []; threads[op].push(del[1]); } else if (del < 0) already_gone.push(-del); else if (del) winston.warn('Unknown del: ' + del); }); if (already_gone.length) winston.warn("Tried to delete missing posts: " + already_gone); if (_.isEmpty(threads)) return callback(null); var m = self.connect().multi(); for (var op in threads) { var nums = threads[op]; nums.sort(); var opts = {cacheUpdate: {nums: nums}}; self._log(m, op, common.DELETE_POSTS, nums, opts); } m.exec(callback); } }; Y.remove_thread = function (op, callback) { if (OPs[op] != op) return callback(Muggle('Thread does not exist.')); var r = this.connect(); var key = 'thread:' + op, dead_key = 'dead:' + op; var graveyardKey = 'tag:' + tag_key('graveyard'); var privs = null; var etc = {cacheUpdate: {}}; var self = this; async.waterfall([ function (next) { get_all_replies_and_privs(r, op, next); }, function (nums, threadPrivs, next) { etc.cacheUpdate.nums = nums; privs = threadPrivs; if (!nums || !nums.length) return next(null, []); tail.map(nums, self.remove_post.bind(self, false), next); }, function (dels, next) { var m = r.multi(); m.incr(graveyardKey + ':bumpctr'); m.hgetall(key); m.exec(next); }, function (rs, next) { var deadCtr = rs[0], post = rs[1]; var tags = parse_tags(post.tags); var subject = subject_val(op, post.subject); /* Rename thread keys, move to graveyard */ var m = r.multi(); var expiryKey = expiry_queue_key(); tags.forEach(function (tag) { var tagKey = tag_key(tag); m.zrem(expiryKey, op + ':' + tagKey); m.zrem('tag:' + tagKey + ':threads', op); if (subject) m.zrem('tag:' + tagKey + ':subjects', subject); }); m.zadd(graveyardKey + ':threads', deadCtr, op); etc.tags = tags; self._log(m, op, common.DELETE_THREAD, [], etc); m.hset(key, 'hide', 1); /* Next two vals are checked */ m.renamenx(key, dead_key); m.renamenx(key + ':history', dead_key + ':history'); m.renamenx(key + ':ips', dead_key + ':ips'); m.exec(next); }, function (results, done) { var dels = results.slice(-2); if (dels.some(function (x) { return x === 0; })) return done("Already deleted?!"); delete OPs[op]; delete TAGS[op]; /* Extra renames now that we have renamenx exclusivity */ var m = r.multi(); m.rename(key + ':posts', dead_key + ':posts'); m.rename(key + ':links', dead_key + ':links'); if (privs.length) { m.rename(key + ':privs', dead_key + ':privs'); privs.forEach(function (priv) { var suff = ':privs:' + priv; m.rename(key + suff, dead_key + suff); }); } m.exec(function (err) { done(err, null); /* second arg is remove_posts dels */ }); /* Background, might not even be there */ self.finish_quietly(dead_key, warn); self.hide_image(dead_key, warn); }], callback); }; Y.archive_thread = function (op, callback) { var r = this.connect(); var key = 'thread:' + op, archiveKey = 'tag:' + tag_key('archive'); var self = this; async.waterfall([ function (next) { var m = r.multi(); m.exists(key); m.hget(key, 'immortal'); m.zscore('tag:' + tag_key('graveyard') + ':threads', op); m.exec(next); }, function (rs, next) { if (!rs[0]) return callback(Muggle(key + ' does not exist.')); if (parse_number(rs[1])) return callback(Muggle(key + ' is immortal.')); if (rs[2]) return callback(Muggle(key + ' is already deleted.')); var m = r.multi(); // order counts m.hgetall(key); m.hgetall(key + ':links'); m.llen(key + ':posts'); m.smembers(key + ':privs'); m.lrange(key + ':dels', 0, -1); m.exec(next); }, function (rs, next) { var view = rs[0], links = rs[1], replyCount = rs[2], privs = rs[3], dels = rs[4]; var subject = subject_val(op, view.subject); var tags = view.tags; var m = r.multi(); // move to archive tag only m.hset(key, 'origTags', tags); m.hset(key, 'tags', tag_key('archive')); tags = parse_tags(tags); tags.forEach(function (tag) { var tagKey = 'tag:' + tag_key(tag); m.zrem(tagKey + ':threads', op); if (subject) m.zrem(tagKey + ':subjects', subject); }); m.zadd(archiveKey + ':threads', op, op); self._log(m, op, common.DELETE_THREAD, [], {tags: tags}); // shallow thread insertion message in archive if (!_.isEmpty(links)) view.links = links; extract(view); delete view.ip; view.replyctr = replyCount; view.hctr = 0; var etc = {tags: ['archive'], cacheUpdate: {}}; self._log(m, op, common.MOVE_THREAD, [view], etc); // clear history; note new history could be added // for deletion in the archive // (a bit silly right after adding a new entry) m.hdel(key, 'hctr'); m.del(key + ':history'); m.del(key + ':ips'); // delete hidden posts dels.forEach(function (num) { m.del('post:' + num); m.del('post:' + num + ':links'); }); m.del(key + ':dels'); if (privs.length) { m.del(key + ':privs'); privs.forEach(function (priv) { m.del(key + ':privs:' + priv); }); } m.exec(next); }, function (results, done) { set_OP_tag(config.BOARDS.indexOf('archive'), op); done(); }], callback); }; /* BOILERPLATE CITY */ Y.remove_images = function (nums, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); var threads = {}; var rem = this.remove_image.bind(this, threads); var self = this; tail.forEach(nums, rem, function (err) { if (err) return callback(err); var m = self.connect().multi(); for (var op in threads) self._log(m, op, common.DELETE_IMAGES, threads[op], {tags: tags_of(op)}); m.exec(callback); }); }; Y.remove_image = function (threads, num, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); var r = this.connect(); var op = OPs[num]; if (!op) callback(null, false); var key = (op == num ? 'thread:' : 'post:') + num; var self = this; r.hexists(key, 'src', function (err, exists) { if (err) return callback(err); if (!exists) return callback(null); self.hide_image(key, function (err) { if (err) return callback(err); r.hset(key, 'hideimg', 1, function (err, affected) { if (err) return callback(err); if (!affected) return callback(null); if (threads[op]) threads[op].push(num); else threads[op] = [num]; r.hincrby('thread:' + op, 'imgctr', -1, callback); }); }); }); }; Y.hide_image = function (key, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); var r = this.connect(); var hash; var imgKeys = ['hideimg', 'hash', 'src', 'thumb', 'realthumb', 'mid']; r.hmget(key, imgKeys, move_dead); function move_dead(err, rs) { if (err) return callback(err); if (!rs) return callback(null); var info = {}; for (var i = 0; i < rs.length; i++) info[imgKeys[i]] = rs[i]; if (info.hideimg) /* already gone */ return callback(null); hooks.trigger("buryImage", info, callback); } }; Y.force_image_spoilers = function (nums, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); var threads = {}; var rem = this.spoiler_image.bind(this, threads); var self = this; tail.forEach(nums, rem, function (err) { if (err) return callback(err); var m = self.connect().multi(); for (var op in threads) self._log(m, op, common.SPOILER_IMAGES, threads[op], {tags: tags_of(op)}); m.exec(callback); }); }; Y.spoiler_image = function (threads, num, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); var r = this.connect(); var op = OPs[num]; if (!op) callback(null, false); var key = (op == num ? 'thread:' : 'post:') + num; var self = this; var spoilerKeys = ['src', 'spoiler', 'realthumb']; r.hmget(key, spoilerKeys, function (err, info) { if (err) return callback(err); /* no image or already spoilt */ if (!info[0] || info[1] || info[2]) return callback(null); var index = common.pick_spoiler(-1).index; r.hmset(key, 'spoiler', index, function (err) { if (err) return callback(err); if (threads[op]) threads[op].push([num, index]); else threads[op] = [[num, index]]; callback(null); }); }); }; Y.toggle_thread_lock = function (op, callback) { if (this.ident.readOnly) return callback(Muggle("Read-only right now.")); if (OPs[op] != op) return callback(Muggle('Thread does not exist.')); var r = this.connect(); var key = 'thread:' + op; var self = this; r.hexists(key, 'locked', function (err, locked) { if (err) return callback(err); var m = r.multi(); if (locked) m.hdel(key, 'locked'); else m.hset(key, 'locked', '1'); var act = locked ? common.UNLOCK_THREAD : common.LOCK_THREAD; self._log(m, op, act, []); m.exec(callback); }); }; /* END BOILERPLATE CITY */ function warn(err) { if (err) winston.warn('Warning: ' + err); } Y.check_thread_locked = function (op, callback) { this.connect().hexists('thread:' + op, 'locked', function (err, lock) { if (err) callback(err); else callback(lock ? Muggle('Thread is locked.') : null); }); }; Y.check_throttle = function (ip, callback) { var key = 'ip:' + ip + ':throttle:thread'; this.connect().exists(key, function (err, exists) { if (err) callback(err); else callback(exists ? Muggle('Too soon.') : null); }); }; Y.add_image = function (post, alloc, ip, callback) { var r = this.connect(); var num = post.num, op = post.op; if (!op) return callback(Muggle("Can't add another image to an OP.")); var image = alloc.image; if (!image.pinky) return callback(Muggle("Image is wrong size.")); delete image.pinky; var key = 'post:' + num; r.exists(key, function (err, exists) { if (err) return callback(err); if (!exists) return callback(Muggle("Post does not exist.")); imager.commit_image_alloc(alloc, function (err) { if (err) return callback(err); add_it(); }); }); var self = this; function add_it() { var m = r.multi(); imager.note_hash(image.hash, post.num); // HACK: hmset doesn't like our array, but we need it var orig_dims = image.dims; image.dims = orig_dims.toString(); m.hmset(key, image); image.dims = orig_dims; m.hincrby('thread:' + op, 'imgctr', 1); delete image.hash; self._log(m, op, common.INSERT_IMAGE, [num, image]); var now = Date.now(); var n = post_volume({image: true}); update_throughput(m, ip, now, post_volume({image: true})); m.exec(callback); } }; Y.append_post = function (post, tail, old_state, extra, cb) { var m = this.connect().multi(); var key = (post.op ? 'post:' : 'thread:') + post.num; /* Don't need to check .exists() thanks to client state */ m.append(key + ':body', tail); /* XXX: fragile */ if (old_state[0] != post.state[0] || old_state[1] != post.state[1]) m.hset(key, 'state', post.state.join()); if (extra.ip) { var now = Date.now(); update_throughput(m, extra.ip, now, post_volume(null, tail)); } if (!_.isEmpty(extra.new_links)) m.hmset(key + ':links', extra.new_links); // possibly attach data for dice rolls etc. to the update var attached = {post: post, extra: extra, writeKeys: {}, attach: {}}; var self = this; hooks.trigger("attachToPost", attached, function (err, attached) { if (err) return cb(err); for (var h in attached.writeKeys) m.hset(key, h, attached.writeKeys[h]); var msg = [post.num, tail]; var links = extra.links || {}; var a = old_state[0], b = old_state[1]; // message tail is [... a, b, links, attachment] // default values [... 0, 0, {}, {}] don't need to be sent // to minimize log output if (!_.isEmpty(attached.attach)) msg.push(a, b, links, attached.attach); else if (!_.isEmpty(links)) msg.push(a, b, links); else if (b) msg.push(a, b); else if (a) msg.push(a); self._log(m, post.op || post.num, common.UPDATE_POST, msg); m.exec(cb); }); }; register_lua('finish'); function finish_off(m, key) { var body_key = key.replace('dead', 'thread') + ':body'; m.evalsha(LUA.finish.sha, 3, key, body_key, 'liveposts'); } Y.finish_post = function (post, callback) { var m = this.connect().multi(); var key = (post.op ? 'post:' : 'thread:') + post.num; /* Don't need to check .exists() thanks to client state */ finish_off(m, key); this._log(m, post.op || post.num, common.FINISH_POST, [post.num]); m.exec(callback); }; Y.finish_quietly = function (key, callback) { var m = this.connect().multi(); finish_off(m, key); m.exec(callback); }; Y.finish_all = function (callback) { var r = this.connect(); var self = this; r.smembers('liveposts', function (err, keys) { if (err) return callback(err); async.forEach(keys, function (key, cb) { var isPost = /^post:(\d+)$/.test(key); if (isPost) r.hget(key, 'op', fini); else fini(); function fini(err, op) { if (err) return cb(err); var m = r.multi(); finish_off(m, key); var n = parse_number(key.match(/:(\d+)$/)[1]); op = isPost ? parse_number(op) : n; self._log(m, op, common.FINISH_POST, [n]); m.srem('liveposts', key); m.exec(cb); } }, callback); }); }; Y._log = function (m, op, kind, msg, opts) { opts = opts || {}; msg = JSON.stringify(msg).slice(1, -1); msg = msg.length ? (kind + ',' + msg) : ('' + kind); winston.info("Log: " + msg); if (!op) throw new Error('No OP.'); var priv = this.ident.priv; var prefix = priv ? ('priv:' + priv + ':') : ''; var key = prefix + 'thread:' + op; if (common.is_pubsub(kind)) { m.rpush(key + ':history', msg); m.hincrby(key, 'hctr', 1); } if (opts.ipNum) m.hset(key + ':ips', opts.ipNum, opts.ip); var opBit = op + ','; var len = opBit.length + msg.length; msg = len + '|' + opBit + msg; // we can add an extra trailing message for secret info if (opts.ip) msg += JSON.stringify({auth: {ip: opts.ip}}); m.publish(key, msg); var tags = opts.tags || (this.tag ? [this.tag] : []); tags.forEach(function (tag) { m.publish(prefix + 'tag:' + tag, msg); }); if (opts.cacheUpdate) { var info = {kind: kind, tag: tags[0], op: op}; _.extend(info, opts.cacheUpdate); m.publish('cache', JSON.stringify(info)); } }; Y.fetch_backlogs = function (watching, callback) { var r = this.connect(); var combined = []; var inject_ips = caps.can_moderate(this.ident); forEachInObject(watching, function (thread, cb) { if (thread == 'live') return cb(null); var key = 'thread:' + thread; var sync = watching[thread]; var m = r.multi(); m.lrange(key + ':history', sync, -1); if (inject_ips) { // would be nice to fetch only the relevant ips...? m.hgetall(key + ':ips'); } m.exec(function (err, rs) { if (err) return cb(err); var prefix = thread + ','; var ips = inject_ips && rs[1]; // construct full messages from history entries rs[0].forEach(function (entry) { // attempt to inject ip to INSERT_POST log var m = ips && entry.match(/^2,(\d+),{(.+)$/); var ip = m && ips[m[1]]; if (ip) { var inject = '"ip":' + JSON.stringify(ip) + ','; entry = '2,' + m[1] + ',{' + inject + m[2]; } combined.push(prefix + entry); }); cb(null); }); }, function (errs) { callback(errs, combined); }); }; Y.get_post_op = function (num, callback) { var r = this.connect(); r.hget('post:' + num, 'op', function (err, op) { if (err) return callback(err); else if (op) return callback(null, num, op); r.exists('thread:' + num, function (err, exists) { if (err) callback(err); else if (!exists) callback(null, null, null); else callback(null, num, num); }); }); } Y.get_tag = function (page) { var r = this.connect(); var self = this; var key = 'tag:' + tag_key(this.tag) + ':threads'; var reverseOrder = this.tag == 'archive'; if (page < 0 && !reverseOrder) page = 0; var start = page * config.THREADS_PER_PAGE; var end = start + config.THREADS_PER_PAGE - 1; var m = r.multi(); if (reverseOrder) m.zrange(key, start, end); else m.zrevrange(key, start, end); m.zcard(key); m.exec(function (err, res) { if (err) return self.emit('error', err); var nums = res[0]; if (page > 0 && !nums.length) return self.emit('nomatch'); if (reverseOrder) nums.reverse(); self.emit('begin', res[1]); var reader = new Reader(self); reader.on('error', self.emit.bind(self, 'error')); reader.on('thread', self.emit.bind(self, 'thread')); reader.on('post', self.emit.bind(self, 'post')); reader.on('endthread', self.emit.bind(self, 'endthread')); self._get_each_thread(reader, 0, nums); }); }; Y._get_each_thread = function (reader, ix, nums) { if (!nums || ix >= nums.length) { this.emit('end'); reader.removeAllListeners('endthread'); reader.removeAllListeners('end'); return; } var self = this; var next_please = function () { reader.removeListener('end', next_please); reader.removeListener('nomatch', next_please); self._get_each_thread(reader, ix+1, nums); }; reader.on('end', next_please); reader.on('nomatch', next_please); reader.get_thread(this.tag, nums[ix], { abbrev: config.ABBREVIATED_REPLIES || 5 }); }; /* LURKERS */ register_lua('get_thread'); function lua_get_thread(r, key, abbrev, cb) { var body_key = key.replace('dead', 'thread') + ':body'; var posts_key = key + ':posts'; r.evalsha(LUA.get_thread.sha, 4, key, body_key, posts_key, 'liveposts', abbrev, function (err, rs) { if (err) return cb(err); if (!rs) return cb(null); if (rs.length != 4) throw new Error('get_thread.lua unexpected output'); // activePosts is a hash of hashes; needs to be unbulked on two levels var activeBulk = rs[2]; for (var i = 1; i < activeBulk.length; i += 2) activeBulk[i] = unbulk(activeBulk[i]); var active = unbulk(activeBulk); cb(null, { pre: unbulk(rs[0]), replies: rs[1].map(parse_number), active: active, total: rs[3], }); }); } function Reader(yakusoku) { events.EventEmitter.call(this); this.y = yakusoku; this.postCache = {}; if (caps.can_administrate(yakusoku.ident)) this.showPrivs = true; } util.inherits(Reader, events.EventEmitter); exports.Reader = Reader; Reader.prototype.get_thread = function (tag, num, opts) { var r = this.y.connect(); var graveyard = (tag == 'graveyard'); if (graveyard) opts.showDead = true; var key = (graveyard ? 'dead:' : 'thread:') + num; var abbrev = opts.abbrev || 0; var self = this; lua_get_thread(r, key, abbrev, function (err, result) { if (err) return self.emit('error', err); if (!result || !result.pre || !result.pre.time) { if (!opts.redirect) return self.emit('nomatch'); r.hget('post:' + num, 'op', function (err, op) { if (err) self.emit('error', err); else if (!op) self.emit('nomatch'); else self.emit('redirect', op); }); return; } var opPost = result.pre; var total = result.total; var nums = result.replies; self.postCache = result.active; var exists = self.is_visible(opPost, opts); var tags = parse_tags(opPost.tags); if (!graveyard && tags.indexOf(tag) < 0) { if (opts.redirect) { var op = OPs[num]; return self.emit('redirect', op || num, tags[0]); } else exists = false; } if (!exists) { self.emit('nomatch'); return; } self.emit('begin', opPost); opPost.num = num; refine_post(opPost); var priv = self.y.ident.priv; if (opts.showDead || priv) { var m = r.multi(); // order is important! if (opts.showDead) { var deadKey = key + ':dels'; m.lrange(deadKey, -abbrev, -1); if (abbrev) m.llen(deadKey); } if (priv) { var privsKey = key + ':privs:' + priv; m.lrange(privsKey, -abbrev, -1); if (abbrev) m.llen(privsKey); } m.exec(prepared); } else prepared(); function prepared(err, rs) { if (err) return self.emit('error', err); // get results in the same order as before var deadNums, privNums; if (opts.showDead) { deadNums = rs.shift(); if (abbrev) total += parse_number(rs.shift()); } if (priv) { privNums = rs.shift(); if (abbrev) total += parse_number(rs.shift()); } if (deadNums) nums = merge_posts(nums, deadNums, abbrev); if (priv) { nums = merge_posts(nums, privNums, abbrev); if (self.showPrivs) self.privNums = privNums; } var omit = Math.max(total - abbrev, 0); self.emit('thread', opPost, omit); self._get_each_reply(0, nums, opts); } }); }; function merge_posts(nums, privNums, abbrev) { var i = nums.length - 1, pi = privNums.length - 1; if (pi < 0) return nums; var merged = []; while (!abbrev || merged.length < abbrev) { if (i >= 0 && pi >= 0) { var num = nums[i], pNum = privNums[pi]; if (parse_number(num) > parse_number(pNum)) { merged.unshift(num); i--; } else { merged.unshift(pNum); pi--; } } else if (i >= 0) merged.unshift(nums[i--]); else if (pi >= 0) merged.unshift(privNums[pi--]); else break; } return merged; } function can_see_priv(priv, ident) { if (!priv) return true; // not private if (!ident) return false; if (ident.showPriv) return true; return priv == ident.priv; } Reader.prototype._get_each_reply = function (ix, nums, opts) { var cache = this.postCache; var limit = 20; var privs = this.privNums; function apply_privs(post) { if (privs && post.num && _.contains(privs, post.num.toString())) post.priv = true; } // find a run of posts that need to be fetched var end; for (end = ix; end < nums.length && (end - ix) < limit; end++) { if (cache[nums[end]]) break; } if (ix < end) { // fetch posts in the ix..end range var range = []; for (var i = ix; i < end; i++) range.push(nums[i]); var self = this; this.get_posts('post', range, opts, function (err, posts) { if (err) return self.emit('error', err); for (var i = 0; i < posts.length; i++) { var post = posts[i]; apply_privs(post); self.emit('post', post); } process.nextTick(self._get_each_reply.bind(self, end, nums, opts)); }); return; } // otherwise read posts from cache for (; ix < nums.length; ix++) { var num = nums[ix]; var post = cache[num]; if (!post) break; if (this.is_visible(post, opts)) { post.num = num; refine_post(post); apply_privs(post); this.emit('post', post); } } if (ix < nums.length) { process.nextTick(this._get_each_reply.bind(this, ix, nums, opts)); } else { this.emit('endthread'); this.emit('end'); } }; /// fetch posts in bulk. it is assumed that none of them are currently being edited Reader.prototype.get_posts = function (kind, nums, opts, cb) { if (!nums.length) return cb(null, []); var r = this.y.connect(); var self = this; var m = r.multi(); for (var i = 0; i < nums.length; i++) { var key = kind + ':' + nums[i]; m.hgetall(key); } m.exec(function (err, results) { if (err) return cb(err); var posts = []; for (var i = 0; i < results.length; i++) { var post = results[i]; var num = nums[i]; post.num = num; if (!self.is_visible(post, opts)) continue; refine_post(post); if (post.editing && !post.body) { post.body = '[a bug ate this post]'; var key = kind + ':' + num + ':body'; r.exists(key, function (err, existed) { if (err) winston.warn(err); else if (existed) winston.warn(key + " was overlooked"); }); } posts.push(post); } cb(null, posts); }); }; Reader.prototype.is_visible = function (post, opts) { if (_.isEmpty(post)) return false; if (post.hide && !opts.showDead) return false; if (!can_see_priv(post.priv, this.ident)) return false; return true; }; /// turn a fresh-from-redis post hash into our expected format function refine_post(post) { post.time = parse_number(post.time); if (typeof post.op == 'string') post.op = parse_number(post.op); if (typeof post.tags == 'string') post.tags = parse_tags(post.tags); if (typeof post.body != 'string') post.body = ''; if (post.state) post.editing = true; // extract the image-specific keys (if any) to a sub-hash extract(post); } function parse_number(n) { return parseInt(n, 10); } /* Including hidden or private. Not in-order. */ function get_all_replies_and_privs(r, op, cb) { var key = 'thread:' + op; var m = r.multi(); m.lrange(key + ':posts', 0, -1); m.smembers(key + ':privs'); m.exec(function (err, rs) { if (err) return cb(err); var nums = rs[0], privs = rs[1]; if (!privs.length) return cb(null, nums, privs); var m = r.multi(); privs.forEach(function (priv) { m.lrange(key + ':privs:' + priv, 0, -1); }); m.exec(function (err, rs) { if (err) return cb(err); rs.forEach(function (ns) { nums.push.apply(nums, ns); }); cb(null, nums, privs); }); }); }; /* AUTHORITY */ function Filter(tag) { events.EventEmitter.call(this); this.tag = tag; }; util.inherits(Filter, events.EventEmitter); exports.Filter = Filter; var F = Filter.prototype; F.connect = function () { if (!this.r) { this.r = global.redis; } return this.r; }; F.get_all = function (limit) { var self = this; var r = this.connect(); r.zrange('tag:' + tag_key(this.tag) + ':threads', 0, -1, go); function go(err, threads) { if (err) return self.failure(err); async.forEach(threads, do_thread, self.check_done.bind(self)); } function do_thread(op, cb) { var key = 'thread:' + op; r.llen(key + ':posts', function (err, len) { if (err) cb(err); len = parse_number(len); if (len > limit) return cb(null); var thumbKeys = ['thumb', 'realthumb', 'src']; r.hmget(key, thumbKeys, function (err, rs) { if (err) cb(err); var thumb = rs[0] || rs[1] || rs[2]; self.emit('thread', {num: op, thumb: thumb}); cb(null); }); }); } }; F.check_done = function (err) { if (err) this.failure(err); else this.success(); }; F.success = function () { this.emit('end'); this.cleanup(); }; F.failure = function (err) { this.emit('error', err); this.cleanup(); }; F.cleanup = function () { this.removeAllListeners('error'); this.removeAllListeners('thread'); this.removeAllListeners('end'); }; /* AMUSEMENT */ Y.get_fun = function (op, callback) { if (cache.funThread && op == cache.funThread) { /* Don't cache, for extra fun */ fs.readFile('client/fun.js', 'UTF-8', callback); } else callback(null); }; Y.set_fun_thread = function (op, callback) { if (OPs[op] != op) return callback(Muggle("Thread not found.")); var self = this; fs.readFile('client/fun.js', 'UTF-8', function (err, funJs) { if (err) return callback(err); cache.funThread = op; var m = self.connect().multi(); self._log(m, op, common.EXECUTE_JS, [funJs]); m.exec(callback); }); }; Y.get_banner = function (cb) { var key = 'tag:' + tag_key(this.tag) + ':banner'; this.connect().hgetall(key, cb); }; Y.set_banner = function (op, message, cb) { var r = this.connect(); var key = 'tag:' + tag_key(this.tag) + ':banner'; var self = this; r.hgetall(key, function (err, old) { if (err) return cb(err); var m = r.multi(); if (old && old.op != op) { // clear previous thread's banner self._log(m, old.op, common.UPDATE_BANNER, [null]); } // write new banner m.hmset(key, {op: op, msg: message}); self._log(m, op, common.UPDATE_BANNER, [message]); m.exec(cb); }); }; Y.teardown = function (board, cb) { var m = this.connect().multi(); var filter = new Filter(board); var self = this; filter.get_all(NaN); // no length limit filter.on('thread', function (thread) { self._log(m, thread.num, common.TEARDOWN, []); }); filter.on('error', cb); filter.on('end', function () { m.exec(cb); }); }; Y.get_current_body = function (num, cb) { var key = (OPs[num] == num ? 'thread:' : 'post:') + num; var m = this.connect().multi(); m.hmget(key, 'hide', 'body'); m.get(key + ':body'); m.exec(function (err, rs) { if (err) return cb(err); var hide = rs[0][0], finalBody = rs[0][1]; var liveBody = rs[1]; if (hide) return cb(null); if (finalBody) return cb(null, finalBody, true); cb(null, liveBody || '', false); }); }; /* HELPERS */ function extract(post) { hooks.trigger_sync('extractPost', post); } function with_body(r, key, post, callback) { if (post.body !== undefined) callback(null, post); else r.get(key + ':body', function (err, body) { if (err) return callback(err); if (body !== null) { post.body = body; post.editing = true; return callback(null, post); } // Race condition between finishing posts r.hget(key, 'body', function (err, body) { if (err) return callback(err); post.body = body || ''; callback(null, post); }); }); }; function subject_val(op, subject) { return subject && (op + ':' + subject); } function tag_key(tag) { return tag.length + ':' + tag; } function parse_tags(input) { if (!input) { winston.warn('Blank tag!'); return []; } var tags = []; while (input.length) { var m = input.match(/^(\d+):/); if (!m) break; var len = parse_number(m[1]); var pre = m[1].length + 1; if (input.length < pre + len) break; tags.push(input.substr(pre, len)); input = input.slice(pre + len); } return tags; } exports.parse_tags = parse_tags; function hmget_obj(r, key, keys, cb) { r.hmget(key, keys, function (err, rs) { if (err) return cb(err); var result = {}; for (var i = 0; i < keys.length; i++) result[keys[i]] = rs[i]; cb(null, result); }); } /// converts a lua bulk response to a hash function unbulk(list) { if (!list) return null; if (list.length % 2) { console.warn('bad bulk:', list); throw new Error('bulk of odd len ' + list.length); } var hash = {}; for (var i = 0; i < list.length; i += 2) { var key = list[i]; if (key in hash) throw new Error('bulk repeated key ' + key); hash[key] = list[i+1]; } return hash; }