You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
doushio/db.js

1948 lines
46 KiB

var _ = require('./lib/underscore'),
async = require('async'),
cache = require('./server/state').dbCache,
caps = require('./server/caps'),
common = require('./common'),
config = require('./config'),
events = require('events'),
fs = require('fs'),
hooks = require('./hooks'),
ipUtils = require('ip'),
Muggle = require('./etc').Muggle,
tail = require('./tail'),
util = require('util'),
winston = require('winston');
var imager = require('./imager'); /* set up hooks */
var OPs = exports.OPs = cache.OPs;
var TAGS = exports.TAGS = cache.opTags;
var SUBS = exports.SUBS = cache.threadSubs;
var LUA = {};
function register_lua(name) {
var src = fs.readFileSync('lua/' + name + '.lua', 'UTF-8');
LUA[name] = {src: src};
}
function redis_client() {
var conn = require('redis').createClient(config.REDIS_PORT || undefined);
// ASYNC SETUP RACE!
for (var k in LUA)
load(LUA[k]);
function load(entry) {
conn.script('load', entry.src, function (err, sha) {
if (err)
throw err;
entry.sha = sha;
});
}
return conn;
}
exports.redis_client = redis_client;
// wait for the `register_lua` calls before connecting
process.nextTick(function () {
global.redis = redis_client();
});
/* REAL-TIME UPDATES */
function Subscription(targetInfo) {
events.EventEmitter.call(this);
this.setMaxListeners(0);
this.fullKey = targetInfo.key;
this.target = targetInfo.target;
this.channel = targetInfo.channel;
SUBS[this.fullKey] = this;
this.pending_subscriptions = [];
this.subscription_callbacks = [];
this.k = redis_client();
this.k.on('error', this.on_sub_error.bind(this));
this.k.on('subscribe', this.on_one_sub.bind(this));
this.k.subscribe(this.target);
this.subscriptions = [this.target];
this.pending_subscriptions.push(this.target);
if (this.target != this.fullKey) {
this.k.subscribe(this.fullKey);
this.pending_subscriptions.push(this.fullKey);
}
};
util.inherits(Subscription, events.EventEmitter);
var S = Subscription.prototype;
Subscription.full_key = function (target, ident) {
var channel;
if (ident && ident.priv)
channel = 'priv:' + ident.priv;
else if (caps.can_moderate(ident))
channel = 'auth';
var key = channel ? channel + ':' + target : target;
return {key: key, channel: channel, target: target};
};
Subscription.get = function (target, ident) {
var full = Subscription.full_key(target, ident);
var sub = SUBS[full.key];
if (!sub)
sub = new Subscription(full);
return sub;
};
S.when_ready = function (cb) {
if (this.subscription_callbacks)
this.subscription_callbacks.push(cb);
else
cb(null);
};
S.on_one_sub = function (name) {
var i = this.pending_subscriptions.indexOf(name);
if (i < 0)
throw "Obtained unasked-for subscription " + name + "?!";
this.pending_subscriptions.splice(i, 1);
if (this.pending_subscriptions.length == 0)
this.on_all_subs();
};
S.on_all_subs = function () {
var k = this.k;
k.removeAllListeners('subscribe');
k.on('message', this.on_message.bind(this));
k.removeAllListeners('error');
k.on('error', this.sink_sub.bind(this));
this.subscription_callbacks.forEach(function (cb) {
cb(null);
});
delete this.pending_subscriptions;
delete this.subscription_callbacks;
};
function parse_pub_message(msg) {
var m = msg.match(/^(\d+)\|/);
var prefixLen = m[0].length;
var bodyLen = parse_number(m[1]);
var info = {body: msg.substr(prefixLen, bodyLen)};
var suffixPos = prefixLen + bodyLen;
if (msg.length > suffixPos)
info.suffixPos = suffixPos;
return info;
}
S.on_message = function (chan, msg) {
/* Do we need to clarify whether this came from target or fullKey? */
var parsed = parse_pub_message(msg), extra;
if (this.channel && parsed.suffixPos) {
var suffix = JSON.parse(msg.slice(parsed.suffixPos));
extra = suffix[this.channel];
}
msg = parsed.body;
var m = msg.match(/^(\d+),(\d+)/);
if(m===null) return;
var op = parse_number(m[1]);
var kind = parse_number(m[2]);
if (extra && kind == common.INSERT_POST) {
// add ip to INSERT_POST
var m = msg.match(/^(\d+,2,\d+,{)(.+)$/);
if (m && extra.ip) {
if (/"ip":/.test(msg))
throw "`ip` in public pub " + chan;
msg = m[1] + '"ip":' + JSON.stringify(extra.ip) + ',' + m[2];
}
}
this.emit('update', op, kind, '[[' + msg + ']]');
};
S.on_sub_error = function (err) {
winston.error("Subscription error:", (err.stack || err));
this.commit_sudoku();
this.subscription_callbacks.forEach(function (cb) {
cb(err);
});
this.subscription_callbacks = null;
};
S.sink_sub = function (err) {
if (config.DEBUG)
throw err;
this.emit('error', this.target, err);
this.commit_sudoku();
};
S.commit_sudoku = function () {
var k = this.k;
k.removeAllListeners('error');
k.removeAllListeners('message');
k.removeAllListeners('subscribe');
k.quit();
if (SUBS[this.fullKey] === this)
delete SUBS[this.fullKey];
this.removeAllListeners('update');
this.removeAllListeners('error');
};
S.has_no_listeners = function () {
/* Possibly idle out after a while */
var self = this;
if (this.idleOutTimer)
clearTimeout(this.idleOutTimer);
this.idleOutTimer = setTimeout(function () {
self.idleOutTimer = null;
if (self.listeners('update').length == 0)
self.commit_sudoku();
}, 30 * 1000);
};
/* OP CACHE */
function add_OP_tag(tagIndex, op) {
var tags = TAGS[op];
if (tags === undefined)
TAGS[op] = tagIndex;
else if (typeof tags == 'number') {
if (tagIndex != tags)
TAGS[op] = [tags, tagIndex];
}
else if (tags.indexOf(tagIndex) < 0)
tags.push(tagIndex);
}
function set_OP_tag(tagIndex, op) {
TAGS[op] = tagIndex;
}
function OP_has_tag(tag, op) {
var index = config.BOARDS.indexOf(tag);
if (index < 0)
return false;
var tags = TAGS[op];
if (tags === undefined)
return false;
if (typeof tags == 'number')
return index == tags;
else
return tags.indexOf(index) >= 0;
};
exports.OP_has_tag = OP_has_tag;
exports.first_tag_of = function (op) {
var tags = TAGS[op];
if (tags === undefined)
return false;
else if (typeof tags == 'number')
return config.BOARDS[tags];
else
return config.BOARDS[tags[0]];
};
function tags_of(op) {
var tags = TAGS[op];
if (tags === undefined)
return false;
else if (typeof tags == 'number')
return [config.BOARDS[tags]];
else
return tags.map(function (i) { return config.BOARDS[i]; });
}
exports.tags_of = tags_of;
function update_cache(chan, msg) {
msg = JSON.parse(msg);
var op = msg.op, kind = msg.kind, tag = msg.tag;
if (kind == common.INSERT_POST) {
if (msg.num)
OPs[msg.num] = op;
else {
add_OP_tag(config.BOARDS.indexOf(tag), op);
OPs[op] = op;
}
}
else if (kind == common.MOVE_THREAD) {
set_OP_tag(config.BOARDS.indexOf(tag), op);
}
else if (kind == common.DELETE_POSTS) {
msg.nums.forEach(function (num) {
delete OPs[num];
});
}
else if (kind == common.DELETE_THREAD) {
msg.nums.forEach(function (num) {
delete OPs[num];
});
delete TAGS[op];
}
}
exports.track_OPs = function (callback) {
var k = redis_client();
k.subscribe('cache');
k.once('subscribe', function () {
load_OPs(callback);
});
k.on('message', update_cache);
/* k persists for the purpose of cache updates */
};
exports.on_pub = function (name, handler) {
// TODO: share redis connection
var k = redis_client();
k.subscribe(name);
k.on('message', handler);
/* k persists */
};
function load_OPs(callback) {
var r = global.redis;
var boards = config.BOARDS;
// Want consistent ordering in the TAGS entries for multi-tag threads
// (so do them in series)
tail.forEach(boards, scan_board, callback);
var threadsKey;
function scan_board(tag, cb) {
var tagIndex = boards.indexOf(tag);
threadsKey = 'tag:' + tag_key(tag) + ':threads';
r.zrange(threadsKey, 0, -1, function (err, threads) {
if (err)
return cb(err);
async.forEach(threads, function (op, cb) {
op = parse_number(op);
var ps = [scan_thread.bind(null,tagIndex,op)];
if (!config.READ_ONLY && config.THREAD_EXPIRY
&& tag != 'archive') {
ps.push(refresh_expiry.bind(null,
tag, op));
}
async.parallel(ps, cb);
}, cb);
});
}
function scan_thread(tagIndex, op, cb) {
op = parse_number(op);
add_OP_tag(tagIndex, op);
OPs[op] = op;
get_all_replies_and_privs(r, op, function (err, posts) {
if (err)
return cb(err);
posts.forEach(function (num) {
OPs[parse_number(num)] = op;
});
cb(null);
});
}
var expiryKey = expiry_queue_key();
function refresh_expiry(tag, op, cb) {
if (tag == config.STAFF_BOARD)
return cb(null);
var entry = op + ':' + tag_key(tag);
var queries = ['time', 'immortal'];
hmget_obj(r, 'thread:'+op, queries, function (err, thread) {
if (err)
return cb(err);
if (!thread.time) {
winston.warn('Thread '+op+" doesn't exist.");
var m = r.multi();
m.zrem(threadsKey, op);
m.zrem(expiryKey, entry);
m.exec(cb);
return;
}
if (thread.immortal)
return r.zrem(expiryKey, entry, cb);
var score = expiry_queue_score(thread.time);
r.zadd(expiryKey, score, entry, cb);
});
}
}
function expiry_queue_score(time) {
return Math.floor(parse_number(time)/1000 + config.THREAD_EXPIRY);
}
function expiry_queue_key() {
return 'expiry:' + config.THREAD_EXPIRY;
}
exports.expiry_queue_key = expiry_queue_key;
/* SOCIETY */
exports.is_board = function (board) {
return config.BOARDS.indexOf(board) >= 0;
};
exports.UPKEEP_IDENT = {auth: 'Upkeep', ip: '127.0.0.1'};
function Yakusoku(board, ident) {
events.EventEmitter.call(this);
this.id = ++(cache.YAKUMAN);
this.tag = board;
this.ident = ident;
this.subs = [];
}
util.inherits(Yakusoku, events.EventEmitter);
exports.Yakusoku = Yakusoku;
var Y = Yakusoku.prototype;
Y.connect = function () {
// multiple redis connections are pointless (without slaves)
return global.redis;
};
Y.disconnect = function () {
this.removeAllListeners('end');
};
function forEachInObject(obj, f, callback) {
var total = 0, complete = 0, done = false, errors = [];
function cb(err) {
complete++;
if (err)
errors.push(err);
if (done && complete == total)
callback(errors.length ? errors : null);
}
for (var k in obj) {
if (obj.hasOwnProperty(k)) {
total++;
f(k, cb);
}
}
done = true;
if (complete == total)
callback(errors.length ? errors : null);
}
Y.target_key = function (id) {
return (id == 'live') ? 'tag:' + this.tag : 'thread:' + id;
};
Y.kiku = function (targets, on_update, on_sink, callback) {
var self = this;
this.on_update = on_update;
this.on_sink = on_sink;
forEachInObject(targets, function (id, cb) {
var target = self.target_key(id);
var sub = Subscription.get(target, self.ident);
sub.on('update', on_update);
sub.on('error', on_sink);
self.subs.push(sub.fullKey);
sub.when_ready(cb);
}, callback);
};
Y.kikanai = function () {
var self = this;
this.subs.forEach(function (key) {
var sub = SUBS[key];
if (sub) {
sub.removeListener('update', self.on_update);
sub.removeListener('error', self.on_sink);
if (sub.listeners('update').length == 0)
sub.has_no_listeners();
}
});
this.subs = [];
};
function post_volume(view, body) {
return (body ? body.length : 0) +
(view ? (config.NEW_POST_WORTH || 0) : 0) +
((view && view.image) ? (config.IMAGE_WORTH || 0) : 0);
}
function update_throughput(m, ip, when, quant) {
var key = 'ip:' + ip + ':throttle:';
var shortKey = key + short_term_timeslot(when);
var longKey = key + long_term_timeslot(when);
m.incrby(shortKey, quant);
m.incrby(longKey, quant);
/* Don't want to use expireat in case of timezone trickery
or something dumb. (Really, UTC should be OK though...) */
// Conservative expirations
m.expire(shortKey, 10 * 60);
m.expire(longKey, 2 * 24 * 3600);
}
function short_term_timeslot(when) {
return Math.floor(when / (1000 * 60 * 5));
}
function long_term_timeslot(when) {
return Math.floor(when / (1000 * 60 * 60 * 24));
}
Y.reserve_post = function (op, ip, callback) {
if (this.ident.readOnly)
return callback(Muggle("Can't post right now."));
var r = this.connect();
if (ipUtils.isLoopback(ip))
return reserve();
var key = 'ip:' + ip + ':throttle:';
var now = Date.now();
var shortTerm = key + short_term_timeslot(now);
var longTerm = key + long_term_timeslot(now);
r.mget([shortTerm, longTerm], function (err, quants) {
if (err)
return callback(Muggle("Limiter failure.", err));
if (quants[0] > config.SHORT_TERM_LIMIT ||
quants[1] > config.LONG_TERM_LIMIT)
return callback(Muggle('Reduce your speed.'));
reserve();
});
function reserve() {
r.incr('postctr', function (err, num) {
if (err)
return callback(err);
OPs[num] = op || num;
callback(null, num);
});
}
};
var optPostFields = 'name trip email auth subject flavor'.split(' ');
Y.insert_post = function (msg, body, extra, callback) {
if (this.ident.readOnly)
return callback(Muggle("Can't post right now."));
var r = this.connect();
if (!this.tag)
return callback(Muggle("Can't retrieve board for posting."));
var ip = extra.ip, board = extra.board, num = msg.num, op = msg.op;
if (!num)
return callback(Muggle("No post num."));
else if (!ip)
return callback(Muggle("No IP."));
else if (op) {
if (OPs[op] != op || !OP_has_tag(board, op)) {
delete OPs[num];
return callback(Muggle('Thread does not exist.'));
}
}
var view = {time: msg.time, ip: ip, state: msg.state.join()};
optPostFields.forEach(function (field) {
if (msg[field])
view[field] = msg[field];
});
var tagKey = 'tag:' + tag_key(this.tag);
if (op)
view.op = op;
else {
view.tags = tag_key(board);
if (board == config.STAFF_BOARD)
view.immortal = 1;
}
if (extra.image_alloc) {
msg.image = extra.image_alloc.image;
if (!op == msg.image.pinky)
return callback(Muggle("Image is the wrong size."));
delete msg.image.pinky;
}
var key = (op ? 'post:' : 'thread:') + num;
var bump = !op || !common.is_sage(view.email);
var m = r.multi();
m.incr(tagKey + ':postctr'); // must be first
if (op)
m.hget('thread:' + op, 'subject'); // must be second
if (bump)
m.incr(tagKey + ':bumpctr');
m.sadd('liveposts', key);
hooks.trigger_sync('inlinePost', {src: msg, dest: view});
if (msg.image) {
if (op)
m.hincrby('thread:' + op, 'imgctr', 1);
else
view.imgctr = 1;
imager.note_hash(msg.image.hash, msg.num);
view.dims = view.dims.toString();
}
m.hmset(key, view);
m.set(key + ':body', body);
if (msg.links)
m.hmset(key + ':links', msg.links);
var etc = {cacheUpdate: {}};
var priv = this.ident.priv;
if (op) {
etc.ipNum = num;
etc.cacheUpdate.num = num;
var pre = 'thread:' + op;
if (priv) {
m.sadd(pre + ':privs', priv);
m.rpush(pre + ':privs:' + priv, num);
}
else
m.rpush(pre + ':posts', num);
}
else {
// TODO: Add to alternate thread list?
// set conditional hide?
op = num;
if (!view.immortal) {
var score = expiry_queue_score(msg.time);
var entry = num + ':' + tag_key(this.tag);
m.zadd(expiry_queue_key(), score, entry);
}
/* Rate-limit new threads */
if (ipUtils.isLoopback(ip))
m.setex('ip:'+ip+':throttle:thread',
config.THREAD_THROTTLE, op);
}
/* Denormalize for backlog */
view.nonce = msg.nonce;
view.body = body;
if (msg.links)
view.links = msg.links;
extract(view);
delete view.ip;
var self = this;
async.waterfall([
function (next) {
if (!msg.image)
return next(null);
imager.commit_image_alloc(extra.image_alloc, next);
},
function (next) {
if (ip) {
var n = post_volume(view, body);
if (n > 0)
update_throughput(m, ip, view.time, n);
etc.ip = ip;
}
self._log(m, op, common.INSERT_POST, [num, view], etc);
m.exec(next);
},
function (results, next) {
if (!bump)
return next();
var postctr = results[0];
var subject = subject_val(op,
op==num ? view.subject : results[1]);
var m = r.multi();
m.zadd(tagKey + ':threads', postctr, op);
if (subject)
m.zadd(tagKey + ':subjects', postctr, subject);
m.exec(next);
}],
function (err) {
if (err) {
delete OPs[num];
return callback(err);
}
callback(null);
});
};
Y.remove_post = function (from_thread, num, callback) {
num = parse_number(num);
var op = OPs[num];
if (!op)
return callback(Muggle('No such post.'));
if (op == num) {
if (!from_thread)
return callback('Deletion loop?!');
return this.remove_thread(num, callback);
}
var r = this.connect();
var self = this;
if (from_thread) {
var key = 'thread:' + op;
r.lrem(key + ':posts', -1, num, function (err, delCount) {
if (err)
return callback(err);
/* did someone else already delete this? */
if (delCount != 1)
return callback(null, -num);
/* record deletion */
r.rpush(key + ':dels', num, function (err) {
if (err)
winston.warn(err);
gone_from_thread();
});
});
}
else
gone_from_thread();
function gone_from_thread() {
var key = 'post:' + num;
r.hset(key, 'hide', '1', function (err) {
if (err) {
/* Difficult to recover. Whatever. */
winston.warn("Couldn't hide: " + err);
}
/* TODO push cache update? */
delete OPs[num];
callback(null, [op, num]);
/* In the background, try to finish the post */
self.finish_quietly(key, warn);
self.hide_image(key, warn);
});
}
};
Y.remove_posts = function (nums, callback) {
var self = this;
tail.map(nums, this.remove_post.bind(this, true), all_gone);
function all_gone(err, dels) {
if (err)
return callback(err);
var threads = {}, already_gone = [];
dels.forEach(function (del) {
if (Array.isArray(del)) {
var op = del[0];
if (!(op in threads))
threads[op] = [];
threads[op].push(del[1]);
}
else if (del < 0)
already_gone.push(-del);
else if (del)
winston.warn('Unknown del: ' + del);
});
if (already_gone.length)
winston.warn("Tried to delete missing posts: " +
already_gone);
if (_.isEmpty(threads))
return callback(null);
var m = self.connect().multi();
for (var op in threads) {
var nums = threads[op];
nums.sort();
var opts = {cacheUpdate: {nums: nums}};
self._log(m, op, common.DELETE_POSTS, nums, opts);
}
m.exec(callback);
}
};
Y.remove_thread = function (op, callback) {
if (OPs[op] != op)
return callback(Muggle('Thread does not exist.'));
var r = this.connect();
var key = 'thread:' + op, dead_key = 'dead:' + op;
var graveyardKey = 'tag:' + tag_key('graveyard');
var privs = null;
var etc = {cacheUpdate: {}};
var self = this;
async.waterfall([
function (next) {
get_all_replies_and_privs(r, op, next);
},
function (nums, threadPrivs, next) {
etc.cacheUpdate.nums = nums;
privs = threadPrivs;
if (!nums || !nums.length)
return next(null, []);
tail.map(nums, self.remove_post.bind(self, false), next);
},
function (dels, next) {
var m = r.multi();
m.incr(graveyardKey + ':bumpctr');
m.hgetall(key);
m.exec(next);
},
function (rs, next) {
var deadCtr = rs[0], post = rs[1];
var tags = parse_tags(post.tags);
var subject = subject_val(op, post.subject);
/* Rename thread keys, move to graveyard */
var m = r.multi();
var expiryKey = expiry_queue_key();
tags.forEach(function (tag) {
var tagKey = tag_key(tag);
m.zrem(expiryKey, op + ':' + tagKey);
m.zrem('tag:' + tagKey + ':threads', op);
if (subject)
m.zrem('tag:' + tagKey + ':subjects', subject);
});
m.zadd(graveyardKey + ':threads', deadCtr, op);
etc.tags = tags;
self._log(m, op, common.DELETE_THREAD, [], etc);
m.hset(key, 'hide', 1);
/* Next two vals are checked */
m.renamenx(key, dead_key);
m.renamenx(key + ':history', dead_key + ':history');
m.renamenx(key + ':ips', dead_key + ':ips');
m.exec(next);
},
function (results, done) {
var dels = results.slice(-2);
if (dels.some(function (x) { return x === 0; }))
return done("Already deleted?!");
delete OPs[op];
delete TAGS[op];
/* Extra renames now that we have renamenx exclusivity */
var m = r.multi();
m.rename(key + ':posts', dead_key + ':posts');
m.rename(key + ':links', dead_key + ':links');
if (privs.length) {
m.rename(key + ':privs', dead_key + ':privs');
privs.forEach(function (priv) {
var suff = ':privs:' + priv;
m.rename(key + suff, dead_key + suff);
});
}
m.exec(function (err) {
done(err, null); /* second arg is remove_posts dels */
});
/* Background, might not even be there */
self.finish_quietly(dead_key, warn);
self.hide_image(dead_key, warn);
}], callback);
};
Y.archive_thread = function (op, callback) {
var r = this.connect();
var key = 'thread:' + op, archiveKey = 'tag:' + tag_key('archive');
var self = this;
async.waterfall([
function (next) {
var m = r.multi();
m.exists(key);
m.hget(key, 'immortal');
m.zscore('tag:' + tag_key('graveyard') + ':threads', op);
m.exec(next);
},
function (rs, next) {
if (!rs[0])
return callback(Muggle(key + ' does not exist.'));
if (parse_number(rs[1]))
return callback(Muggle(key + ' is immortal.'));
if (rs[2])
return callback(Muggle(key + ' is already deleted.'));
var m = r.multi();
// order counts
m.hgetall(key);
m.hgetall(key + ':links');
m.llen(key + ':posts');
m.smembers(key + ':privs');
m.lrange(key + ':dels', 0, -1);
m.exec(next);
},
function (rs, next) {
var view = rs[0], links = rs[1], replyCount = rs[2],
privs = rs[3], dels = rs[4];
var subject = subject_val(op, view.subject);
var tags = view.tags;
var m = r.multi();
// move to archive tag only
m.hset(key, 'origTags', tags);
m.hset(key, 'tags', tag_key('archive'));
tags = parse_tags(tags);
tags.forEach(function (tag) {
var tagKey = 'tag:' + tag_key(tag);
m.zrem(tagKey + ':threads', op);
if (subject)
m.zrem(tagKey + ':subjects', subject);
});
m.zadd(archiveKey + ':threads', op, op);
self._log(m, op, common.DELETE_THREAD, [], {tags: tags});
// shallow thread insertion message in archive
if (!_.isEmpty(links))
view.links = links;
extract(view);
delete view.ip;
view.replyctr = replyCount;
view.hctr = 0;
var etc = {tags: ['archive'], cacheUpdate: {}};
self._log(m, op, common.MOVE_THREAD, [view], etc);
// clear history; note new history could be added
// for deletion in the archive
// (a bit silly right after adding a new entry)
m.hdel(key, 'hctr');
m.del(key + ':history');
m.del(key + ':ips');
// delete hidden posts
dels.forEach(function (num) {
m.del('post:' + num);
m.del('post:' + num + ':links');
});
m.del(key + ':dels');
if (privs.length) {
m.del(key + ':privs');
privs.forEach(function (priv) {
m.del(key + ':privs:' + priv);
});
}
m.exec(next);
},
function (results, done) {
set_OP_tag(config.BOARDS.indexOf('archive'), op);
done();
}], callback);
};
/* BOILERPLATE CITY */
Y.remove_images = function (nums, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
var threads = {};
var rem = this.remove_image.bind(this, threads);
var self = this;
tail.forEach(nums, rem, function (err) {
if (err)
return callback(err);
var m = self.connect().multi();
for (var op in threads)
self._log(m, op, common.DELETE_IMAGES, threads[op],
{tags: tags_of(op)});
m.exec(callback);
});
};
Y.remove_image = function (threads, num, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
var r = this.connect();
var op = OPs[num];
if (!op)
callback(null, false);
var key = (op == num ? 'thread:' : 'post:') + num;
var self = this;
r.hexists(key, 'src', function (err, exists) {
if (err)
return callback(err);
if (!exists)
return callback(null);
self.hide_image(key, function (err) {
if (err)
return callback(err);
r.hset(key, 'hideimg', 1, function (err, affected) {
if (err)
return callback(err);
if (!affected)
return callback(null);
if (threads[op])
threads[op].push(num);
else
threads[op] = [num];
r.hincrby('thread:' + op, 'imgctr', -1,
callback);
});
});
});
};
Y.hide_image = function (key, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
var r = this.connect();
var hash;
var imgKeys = ['hideimg', 'hash', 'src', 'thumb', 'realthumb', 'mid'];
r.hmget(key, imgKeys, move_dead);
function move_dead(err, rs) {
if (err)
return callback(err);
if (!rs)
return callback(null);
var info = {};
for (var i = 0; i < rs.length; i++)
info[imgKeys[i]] = rs[i];
if (info.hideimg) /* already gone */
return callback(null);
hooks.trigger("buryImage", info, callback);
}
};
Y.force_image_spoilers = function (nums, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
var threads = {};
var rem = this.spoiler_image.bind(this, threads);
var self = this;
tail.forEach(nums, rem, function (err) {
if (err)
return callback(err);
var m = self.connect().multi();
for (var op in threads)
self._log(m, op, common.SPOILER_IMAGES, threads[op],
{tags: tags_of(op)});
m.exec(callback);
});
};
Y.spoiler_image = function (threads, num, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
var r = this.connect();
var op = OPs[num];
if (!op)
callback(null, false);
var key = (op == num ? 'thread:' : 'post:') + num;
var self = this;
var spoilerKeys = ['src', 'spoiler', 'realthumb'];
r.hmget(key, spoilerKeys, function (err, info) {
if (err)
return callback(err);
/* no image or already spoilt */
if (!info[0] || info[1] || info[2])
return callback(null);
var index = common.pick_spoiler(-1).index;
r.hmset(key, 'spoiler', index, function (err) {
if (err)
return callback(err);
if (threads[op])
threads[op].push([num, index]);
else
threads[op] = [[num, index]];
callback(null);
});
});
};
Y.toggle_thread_lock = function (op, callback) {
if (this.ident.readOnly)
return callback(Muggle("Read-only right now."));
if (OPs[op] != op)
return callback(Muggle('Thread does not exist.'));
var r = this.connect();
var key = 'thread:' + op;
var self = this;
r.hexists(key, 'locked', function (err, locked) {
if (err)
return callback(err);
var m = r.multi();
if (locked)
m.hdel(key, 'locked');
else
m.hset(key, 'locked', '1');
var act = locked ? common.UNLOCK_THREAD : common.LOCK_THREAD;
self._log(m, op, act, []);
m.exec(callback);
});
};
/* END BOILERPLATE CITY */
function warn(err) {
if (err)
winston.warn('Warning: ' + err);
}
Y.check_thread_locked = function (op, callback) {
this.connect().hexists('thread:' + op, 'locked', function (err, lock) {
if (err)
callback(err);
else
callback(lock ? Muggle('Thread is locked.') : null);
});
};
Y.check_throttle = function (ip, callback) {
var key = 'ip:' + ip + ':throttle:thread';
this.connect().exists(key, function (err, exists) {
if (err)
callback(err);
else
callback(exists ? Muggle('Too soon.') : null);
});
};
Y.add_image = function (post, alloc, ip, callback) {
var r = this.connect();
var num = post.num, op = post.op;
if (!op)
return callback(Muggle("Can't add another image to an OP."));
var image = alloc.image;
if (!image.pinky)
return callback(Muggle("Image is wrong size."));
delete image.pinky;
var key = 'post:' + num;
r.exists(key, function (err, exists) {
if (err)
return callback(err);
if (!exists)
return callback(Muggle("Post does not exist."));
imager.commit_image_alloc(alloc, function (err) {
if (err)
return callback(err);
add_it();
});
});
var self = this;
function add_it() {
var m = r.multi();
imager.note_hash(image.hash, post.num);
// HACK: hmset doesn't like our array, but we need it
var orig_dims = image.dims;
image.dims = orig_dims.toString();
m.hmset(key, image);
image.dims = orig_dims;
m.hincrby('thread:' + op, 'imgctr', 1);
delete image.hash;
self._log(m, op, common.INSERT_IMAGE, [num, image]);
var now = Date.now();
var n = post_volume({image: true});
update_throughput(m, ip, now, post_volume({image: true}));
m.exec(callback);
}
};
Y.append_post = function (post, tail, old_state, extra, cb) {
var m = this.connect().multi();
var key = (post.op ? 'post:' : 'thread:') + post.num;
/* Don't need to check .exists() thanks to client state */
m.append(key + ':body', tail);
/* XXX: fragile */
if (old_state[0] != post.state[0] || old_state[1] != post.state[1])
m.hset(key, 'state', post.state.join());
if (extra.ip) {
var now = Date.now();
update_throughput(m, extra.ip, now, post_volume(null, tail));
}
if (!_.isEmpty(extra.new_links))
m.hmset(key + ':links', extra.new_links);
// possibly attach data for dice rolls etc. to the update
var attached = {post: post, extra: extra, writeKeys: {}, attach: {}};
var self = this;
hooks.trigger("attachToPost", attached, function (err, attached) {
if (err)
return cb(err);
for (var h in attached.writeKeys)
m.hset(key, h, attached.writeKeys[h]);
var msg = [post.num, tail];
var links = extra.links || {};
var a = old_state[0], b = old_state[1];
// message tail is [... a, b, links, attachment]
// default values [... 0, 0, {}, {}] don't need to be sent
// to minimize log output
if (!_.isEmpty(attached.attach))
msg.push(a, b, links, attached.attach);
else if (!_.isEmpty(links))
msg.push(a, b, links);
else if (b)
msg.push(a, b);
else if (a)
msg.push(a);
self._log(m, post.op || post.num, common.UPDATE_POST, msg);
m.exec(cb);
});
};
register_lua('finish');
function finish_off(m, key) {
var body_key = key.replace('dead', 'thread') + ':body';
m.evalsha(LUA.finish.sha, 3, key, body_key, 'liveposts');
}
Y.finish_post = function (post, callback) {
var m = this.connect().multi();
var key = (post.op ? 'post:' : 'thread:') + post.num;
/* Don't need to check .exists() thanks to client state */
finish_off(m, key);
this._log(m, post.op || post.num, common.FINISH_POST, [post.num]);
m.exec(callback);
};
Y.finish_quietly = function (key, callback) {
var m = this.connect().multi();
finish_off(m, key);
m.exec(callback);
};
Y.finish_all = function (callback) {
var r = this.connect();
var self = this;
r.smembers('liveposts', function (err, keys) {
if (err)
return callback(err);
async.forEach(keys, function (key, cb) {
var isPost = /^post:(\d+)$/.test(key);
if (isPost)
r.hget(key, 'op', fini);
else
fini();
function fini(err, op) {
if (err)
return cb(err);
var m = r.multi();
finish_off(m, key);
var n = parse_number(key.match(/:(\d+)$/)[1]);
op = isPost ? parse_number(op) : n;
self._log(m, op, common.FINISH_POST, [n]);
m.srem('liveposts', key);
m.exec(cb);
}
}, callback);
});
};
Y._log = function (m, op, kind, msg, opts) {
opts = opts || {};
msg = JSON.stringify(msg).slice(1, -1);
msg = msg.length ? (kind + ',' + msg) : ('' + kind);
winston.info("Log: " + msg);
if (!op)
throw new Error('No OP.');
var priv = this.ident.priv;
var prefix = priv ? ('priv:' + priv + ':') : '';
var key = prefix + 'thread:' + op;
if (common.is_pubsub(kind)) {
m.rpush(key + ':history', msg);
m.hincrby(key, 'hctr', 1);
}
if (opts.ipNum)
m.hset(key + ':ips', opts.ipNum, opts.ip);
var opBit = op + ',';
var len = opBit.length + msg.length;
msg = len + '|' + opBit + msg;
// we can add an extra trailing message for secret info
if (opts.ip)
msg += JSON.stringify({auth: {ip: opts.ip}});
m.publish(key, msg);
var tags = opts.tags || (this.tag ? [this.tag] : []);
tags.forEach(function (tag) {
m.publish(prefix + 'tag:' + tag, msg);
});
if (opts.cacheUpdate) {
var info = {kind: kind, tag: tags[0], op: op};
_.extend(info, opts.cacheUpdate);
m.publish('cache', JSON.stringify(info));
}
};
Y.fetch_backlogs = function (watching, callback) {
var r = this.connect();
var combined = [];
var inject_ips = caps.can_moderate(this.ident);
forEachInObject(watching, function (thread, cb) {
if (thread == 'live')
return cb(null);
var key = 'thread:' + thread;
var sync = watching[thread];
var m = r.multi();
m.lrange(key + ':history', sync, -1);
if (inject_ips) {
// would be nice to fetch only the relevant ips...?
m.hgetall(key + ':ips');
}
m.exec(function (err, rs) {
if (err)
return cb(err);
var prefix = thread + ',';
var ips = inject_ips && rs[1];
// construct full messages from history entries
rs[0].forEach(function (entry) {
// attempt to inject ip to INSERT_POST log
var m = ips && entry.match(/^2,(\d+),{(.+)$/);
var ip = m && ips[m[1]];
if (ip) {
var inject = '"ip":' + JSON.stringify(ip) + ',';
entry = '2,' + m[1] + ',{' + inject + m[2];
}
combined.push(prefix + entry);
});
cb(null);
});
}, function (errs) {
callback(errs, combined);
});
};
Y.get_post_op = function (num, callback) {
var r = this.connect();
r.hget('post:' + num, 'op', function (err, op) {
if (err)
return callback(err);
else if (op)
return callback(null, num, op);
r.exists('thread:' + num, function (err, exists) {
if (err)
callback(err);
else if (!exists)
callback(null, null, null);
else
callback(null, num, num);
});
});
}
Y.get_tag = function (page) {
var r = this.connect();
var self = this;
var key = 'tag:' + tag_key(this.tag) + ':threads';
var reverseOrder = this.tag == 'archive';
if (page < 0 && !reverseOrder)
page = 0;
var start = page * config.THREADS_PER_PAGE;
var end = start + config.THREADS_PER_PAGE - 1;
var m = r.multi();
if (reverseOrder)
m.zrange(key, start, end);
else
m.zrevrange(key, start, end);
m.zcard(key);
m.exec(function (err, res) {
if (err)
return self.emit('error', err);
var nums = res[0];
if (page > 0 && !nums.length)
return self.emit('nomatch');
if (reverseOrder)
nums.reverse();
self.emit('begin', res[1]);
var reader = new Reader(self);
reader.on('error', self.emit.bind(self, 'error'));
reader.on('thread', self.emit.bind(self, 'thread'));
reader.on('post', self.emit.bind(self, 'post'));
reader.on('endthread', self.emit.bind(self, 'endthread'));
self._get_each_thread(reader, 0, nums);
});
};
Y._get_each_thread = function (reader, ix, nums) {
if (!nums || ix >= nums.length) {
this.emit('end');
reader.removeAllListeners('endthread');
reader.removeAllListeners('end');
return;
}
var self = this;
var next_please = function () {
reader.removeListener('end', next_please);
reader.removeListener('nomatch', next_please);
self._get_each_thread(reader, ix+1, nums);
};
reader.on('end', next_please);
reader.on('nomatch', next_please);
reader.get_thread(this.tag, nums[ix], {
abbrev: config.ABBREVIATED_REPLIES || 5
});
};
/* LURKERS */
register_lua('get_thread');
function lua_get_thread(r, key, abbrev, cb) {
var body_key = key.replace('dead', 'thread') + ':body';
var posts_key = key + ':posts';
r.evalsha(LUA.get_thread.sha, 4, key, body_key, posts_key, 'liveposts', abbrev,
function (err, rs) {
if (err)
return cb(err);
if (!rs)
return cb(null);
if (rs.length != 4)
throw new Error('get_thread.lua unexpected output');
// activePosts is a hash of hashes; needs to be unbulked on two levels
var activeBulk = rs[2];
for (var i = 1; i < activeBulk.length; i += 2)
activeBulk[i] = unbulk(activeBulk[i]);
var active = unbulk(activeBulk);
cb(null, {
pre: unbulk(rs[0]),
replies: rs[1].map(parse_number),
active: active,
total: rs[3],
});
});
}
function Reader(yakusoku) {
events.EventEmitter.call(this);
this.y = yakusoku;
this.postCache = {};
if (caps.can_administrate(yakusoku.ident))
this.showPrivs = true;
}
util.inherits(Reader, events.EventEmitter);
exports.Reader = Reader;
Reader.prototype.get_thread = function (tag, num, opts) {
var r = this.y.connect();
var graveyard = (tag == 'graveyard');
if (graveyard)
opts.showDead = true;
var key = (graveyard ? 'dead:' : 'thread:') + num;
var abbrev = opts.abbrev || 0;
var self = this;
lua_get_thread(r, key, abbrev, function (err, result) {
if (err)
return self.emit('error', err);
if (!result || !result.pre || !result.pre.time) {
if (!opts.redirect)
return self.emit('nomatch');
r.hget('post:' + num, 'op',
function (err, op) {
if (err)
self.emit('error', err);
else if (!op)
self.emit('nomatch');
else
self.emit('redirect', op);
});
return;
}
var opPost = result.pre;
var total = result.total;
var nums = result.replies;
self.postCache = result.active;
var exists = self.is_visible(opPost, opts);
var tags = parse_tags(opPost.tags);
if (!graveyard && tags.indexOf(tag) < 0) {
if (opts.redirect) {
var op = OPs[num];
return self.emit('redirect', op || num, tags[0]);
}
else
exists = false;
}
if (!exists) {
self.emit('nomatch');
return;
}
self.emit('begin', opPost);
opPost.num = num;
refine_post(opPost);
var priv = self.y.ident.priv;
if (opts.showDead || priv) {
var m = r.multi();
// order is important!
if (opts.showDead) {
var deadKey = key + ':dels';
m.lrange(deadKey, -abbrev, -1);
if (abbrev)
m.llen(deadKey);
}
if (priv) {
var privsKey = key + ':privs:' + priv;
m.lrange(privsKey, -abbrev, -1);
if (abbrev)
m.llen(privsKey);
}
m.exec(prepared);
}
else
prepared();
function prepared(err, rs) {
if (err)
return self.emit('error', err);
// get results in the same order as before
var deadNums, privNums;
if (opts.showDead) {
deadNums = rs.shift();
if (abbrev)
total += parse_number(rs.shift());
}
if (priv) {
privNums = rs.shift();
if (abbrev)
total += parse_number(rs.shift());
}
if (deadNums)
nums = merge_posts(nums, deadNums, abbrev);
if (priv) {
nums = merge_posts(nums, privNums, abbrev);
if (self.showPrivs)
self.privNums = privNums;
}
var omit = Math.max(total - abbrev, 0);
self.emit('thread', opPost, omit);
self._get_each_reply(0, nums, opts);
}
});
};
function merge_posts(nums, privNums, abbrev) {
var i = nums.length - 1, pi = privNums.length - 1;
if (pi < 0)
return nums;
var merged = [];
while (!abbrev || merged.length < abbrev) {
if (i >= 0 && pi >= 0) {
var num = nums[i], pNum = privNums[pi];
if (parse_number(num) > parse_number(pNum)) {
merged.unshift(num);
i--;
}
else {
merged.unshift(pNum);
pi--;
}
}
else if (i >= 0)
merged.unshift(nums[i--]);
else if (pi >= 0)
merged.unshift(privNums[pi--]);
else
break;
}
return merged;
}
function can_see_priv(priv, ident) {
if (!priv)
return true; // not private
if (!ident)
return false;
if (ident.showPriv)
return true;
return priv == ident.priv;
}
Reader.prototype._get_each_reply = function (ix, nums, opts) {
var cache = this.postCache;
var limit = 20;
var privs = this.privNums;
function apply_privs(post) {
if (privs && post.num && _.contains(privs, post.num.toString()))
post.priv = true;
}
// find a run of posts that need to be fetched
var end;
for (end = ix; end < nums.length && (end - ix) < limit; end++) {
if (cache[nums[end]])
break;
}
if (ix < end) {
// fetch posts in the ix..end range
var range = [];
for (var i = ix; i < end; i++)
range.push(nums[i]);
var self = this;
this.get_posts('post', range, opts, function (err, posts) {
if (err)
return self.emit('error', err);
for (var i = 0; i < posts.length; i++) {
var post = posts[i];
apply_privs(post);
self.emit('post', post);
}
process.nextTick(self._get_each_reply.bind(self, end, nums, opts));
});
return;
}
// otherwise read posts from cache
for (; ix < nums.length; ix++) {
var num = nums[ix];
var post = cache[num];
if (!post)
break;
if (this.is_visible(post, opts)) {
post.num = num;
refine_post(post);
apply_privs(post);
this.emit('post', post);
}
}
if (ix < nums.length) {
process.nextTick(this._get_each_reply.bind(this, ix, nums, opts));
} else {
this.emit('endthread');
this.emit('end');
}
};
/// fetch posts in bulk. it is assumed that none of them are currently being edited
Reader.prototype.get_posts = function (kind, nums, opts, cb) {
if (!nums.length)
return cb(null, []);
var r = this.y.connect();
var self = this;
var m = r.multi();
for (var i = 0; i < nums.length; i++) {
var key = kind + ':' + nums[i];
m.hgetall(key);
}
m.exec(function (err, results) {
if (err)
return cb(err);
var posts = [];
for (var i = 0; i < results.length; i++) {
var post = results[i];
var num = nums[i];
post.num = num;
if (!self.is_visible(post, opts))
continue;
refine_post(post);
if (post.editing && !post.body) {
post.body = '[a bug ate this post]';
var key = kind + ':' + num + ':body';
r.exists(key, function (err, existed) {
if (err)
winston.warn(err);
else if (existed)
winston.warn(key + " was overlooked");
});
}
posts.push(post);
}
cb(null, posts);
});
};
Reader.prototype.is_visible = function (post, opts) {
if (_.isEmpty(post))
return false;
if (post.hide && !opts.showDead)
return false;
if (!can_see_priv(post.priv, this.ident))
return false;
return true;
};
/// turn a fresh-from-redis post hash into our expected format
function refine_post(post) {
post.time = parse_number(post.time);
if (typeof post.op == 'string')
post.op = parse_number(post.op);
if (typeof post.tags == 'string')
post.tags = parse_tags(post.tags);
if (typeof post.body != 'string')
post.body = '';
if (post.state)
post.editing = true;
// extract the image-specific keys (if any) to a sub-hash
extract(post);
}
function parse_number(n) {
return parseInt(n, 10);
}
/* Including hidden or private. Not in-order. */
function get_all_replies_and_privs(r, op, cb) {
var key = 'thread:' + op;
var m = r.multi();
m.lrange(key + ':posts', 0, -1);
m.smembers(key + ':privs');
m.exec(function (err, rs) {
if (err)
return cb(err);
var nums = rs[0], privs = rs[1];
if (!privs.length)
return cb(null, nums, privs);
var m = r.multi();
privs.forEach(function (priv) {
m.lrange(key + ':privs:' + priv, 0, -1);
});
m.exec(function (err, rs) {
if (err)
return cb(err);
rs.forEach(function (ns) {
nums.push.apply(nums, ns);
});
cb(null, nums, privs);
});
});
};
/* AUTHORITY */
function Filter(tag) {
events.EventEmitter.call(this);
this.tag = tag;
};
util.inherits(Filter, events.EventEmitter);
exports.Filter = Filter;
var F = Filter.prototype;
F.connect = function () {
if (!this.r) {
this.r = global.redis;
}
return this.r;
};
F.get_all = function (limit) {
var self = this;
var r = this.connect();
r.zrange('tag:' + tag_key(this.tag) + ':threads', 0, -1, go);
function go(err, threads) {
if (err)
return self.failure(err);
async.forEach(threads, do_thread, self.check_done.bind(self));
}
function do_thread(op, cb) {
var key = 'thread:' + op;
r.llen(key + ':posts', function (err, len) {
if (err)
cb(err);
len = parse_number(len);
if (len > limit)
return cb(null);
var thumbKeys = ['thumb', 'realthumb', 'src'];
r.hmget(key, thumbKeys, function (err, rs) {
if (err)
cb(err);
var thumb = rs[0] || rs[1] || rs[2];
self.emit('thread', {num: op, thumb: thumb});
cb(null);
});
});
}
};
F.check_done = function (err) {
if (err)
this.failure(err);
else
this.success();
};
F.success = function () {
this.emit('end');
this.cleanup();
};
F.failure = function (err) {
this.emit('error', err);
this.cleanup();
};
F.cleanup = function () {
this.removeAllListeners('error');
this.removeAllListeners('thread');
this.removeAllListeners('end');
};
/* AMUSEMENT */
Y.get_fun = function (op, callback) {
if (cache.funThread && op == cache.funThread) {
/* Don't cache, for extra fun */
fs.readFile('client/fun.js', 'UTF-8', callback);
}
else
callback(null);
};
Y.set_fun_thread = function (op, callback) {
if (OPs[op] != op)
return callback(Muggle("Thread not found."));
var self = this;
fs.readFile('client/fun.js', 'UTF-8', function (err, funJs) {
if (err)
return callback(err);
cache.funThread = op;
var m = self.connect().multi();
self._log(m, op, common.EXECUTE_JS, [funJs]);
m.exec(callback);
});
};
Y.get_banner = function (cb) {
var key = 'tag:' + tag_key(this.tag) + ':banner';
this.connect().hgetall(key, cb);
};
Y.set_banner = function (op, message, cb) {
var r = this.connect();
var key = 'tag:' + tag_key(this.tag) + ':banner';
var self = this;
r.hgetall(key, function (err, old) {
if (err)
return cb(err);
var m = r.multi();
if (old && old.op != op) {
// clear previous thread's banner
self._log(m, old.op, common.UPDATE_BANNER, [null]);
}
// write new banner
m.hmset(key, {op: op, msg: message});
self._log(m, op, common.UPDATE_BANNER, [message]);
m.exec(cb);
});
};
Y.teardown = function (board, cb) {
var m = this.connect().multi();
var filter = new Filter(board);
var self = this;
filter.get_all(NaN); // no length limit
filter.on('thread', function (thread) {
self._log(m, thread.num, common.TEARDOWN, []);
});
filter.on('error', cb);
filter.on('end', function () {
m.exec(cb);
});
};
Y.get_current_body = function (num, cb) {
var key = (OPs[num] == num ? 'thread:' : 'post:') + num;
var m = this.connect().multi();
m.hmget(key, 'hide', 'body');
m.get(key + ':body');
m.exec(function (err, rs) {
if (err)
return cb(err);
var hide = rs[0][0], finalBody = rs[0][1];
var liveBody = rs[1];
if (hide)
return cb(null);
if (finalBody)
return cb(null, finalBody, true);
cb(null, liveBody || '', false);
});
};
/* HELPERS */
function extract(post) {
hooks.trigger_sync('extractPost', post);
}
function with_body(r, key, post, callback) {
if (post.body !== undefined)
callback(null, post);
else
r.get(key + ':body', function (err, body) {
if (err)
return callback(err);
if (body !== null) {
post.body = body;
post.editing = true;
return callback(null, post);
}
// Race condition between finishing posts
r.hget(key, 'body', function (err, body) {
if (err)
return callback(err);
post.body = body || '';
callback(null, post);
});
});
};
function subject_val(op, subject) {
return subject && (op + ':' + subject);
}
function tag_key(tag) {
return tag.length + ':' + tag;
}
function parse_tags(input) {
if (!input) {
winston.warn('Blank tag!');
return [];
}
var tags = [];
while (input.length) {
var m = input.match(/^(\d+):/);
if (!m)
break;
var len = parse_number(m[1]);
var pre = m[1].length + 1;
if (input.length < pre + len)
break;
tags.push(input.substr(pre, len));
input = input.slice(pre + len);
}
return tags;
}
exports.parse_tags = parse_tags;
function hmget_obj(r, key, keys, cb) {
r.hmget(key, keys, function (err, rs) {
if (err)
return cb(err);
var result = {};
for (var i = 0; i < keys.length; i++)
result[keys[i]] = rs[i];
cb(null, result);
});
}
/// converts a lua bulk response to a hash
function unbulk(list) {
if (!list)
return null;
if (list.length % 2) {
console.warn('bad bulk:', list);
throw new Error('bulk of odd len ' + list.length);
}
var hash = {};
for (var i = 0; i < list.length; i += 2) {
var key = list[i];
if (key in hash)
throw new Error('bulk repeated key ' + key);
hash[key] = list[i+1];
}
return hash;
}