Skip to content

Instantly share code, notes, and snippets.

@sktbrt
Forked from rgazelot/redis.js
Last active March 22, 2019 03:22
Show Gist options
  • Select an option

  • Save sktbrt/7102650bfc2c673f1030dcda3c31f812 to your computer and use it in GitHub Desktop.

Select an option

Save sktbrt/7102650bfc2c673f1030dcda3c31f812 to your computer and use it in GitHub Desktop.
ShareDB Redis Database Adapter
const Db = require('sharedb').DB;
const Redis = require('redis');
module.exports = class ShareDbRedis extends Db {
constructor (options) {
super(options);
this.client = options.client || Redis.createClient(options);
this.ttl = options.ttl || 60 * 60 * 24;
this.closed = false;
}
close (callback) {
this.client.quit();
this.closed = true;
if (callback) return callback();
}
commit (collection, id, op, snapshot, options, callback) {
const script = `
local op = redis.call("zrangebyscore", KEYS[1], ARGV[1], ARGV[1])
-- If a version (score) already exists, do not duplicate it
if #op ~= 0
then
return redis.status_reply("Duplicated score")
end
-- Add the op to the collection
redis.call("zadd", KEYS[1], ARGV[1], ARGV[2])
-- Refresh the expire of the ops collection
redis.call("expire", KEYS[1], ARGV[4])
-- Save the snapshot
redis.call("set", KEYS[2], ARGV[3], "EX", ARGV[4])
return redis.status_reply("OK")
`;
this.client.eval(
script, // The lua script above
2, // The following 2 keys
`sharedb:ops:${collection}:${id}`, // KEYS[1] Ops collection key
`sharedb:snapshot:${collection}:${id}`, // KEYS[2] Snapshot key
op.v || 0, // ARGV[1] The op version (special case for the create)
JSON.stringify(op), // ARGV[2] The op
JSON.stringify(snapshot), // ARGV[3] The snapshot
this.ttl, // ARGV[4] The TTL to apply to all the keys
function (err, res) {
if (err) return callback(err)
if (res === 'Duplicated score') {
return callback(null, false)
}
if (res === 'OK') {
callback(null, true);
} else {
return callback(err);
}
}
);
}
getSnapshot (collection, id, fields, options, callback) {
this._getSnapshot(collection, id, fields, options)
.then(function (res) { callback(null, res); })
.catch(function (err) { callback(err); });
}
getOps (collection, id, from, to, options, callback) {
this._getOps(collection, id, from, to, options)
.then(function (res) { callback(null, res); })
.catch(function (err) { callback(err); });
}
_addOps (collection, id, snapshot, op) {
return new Promise((resolve, reject) => {
this.client.zadd(`sharedb:ops:${collection}:${id}`, op.v || 0, JSON.stringify(op), function (err, res) {
if (err) return reject(err);
return resolve(res);
});
});
}
_expireOps (collection, id) {
return new Promise((resolve, reject) => {
this.client.expire(`sharedb:ops:${collection}:${id}`, this.ttl, (err, res) => {
if (err) return reject(err);
return resolve(res);
});
});
}
_addSnapshot (collection, id, snapshot) {
return new Promise((resolve, reject) => {
this.client.set(`sharedb:snapshot:${collection}:${id}`, JSON.stringify(snapshot), 'EX', this.ttl, (err, res) => {
if (err) return reject(err);
return resolve(res);
});
});
}
_getSnapshot (collection, id, fields, options) {
return new Promise((resolve, reject) => {
this.client.get(`sharedb:snapshot:${collection}:${id}`, (err, res) => {
if (err) return reject(err);
// an empty document must be initialized with a single \n
// see https://github.com/quilljs/quill/issues/1558#issuecomment-312715578
let snapshot = {
id: id,
v: 0,
type: null,
data: [{"insert":"\n"}],
m: undefined
};
try {
if (res) snapshot = JSON.parse(res);
} catch (err) {
return reject(err);
}
return resolve(snapshot);
});
});
}
// Get operations between [from, to) noninclusively. (Ie, the range should
// contain start but not end).
//
// If end is null, this function should return all operations from start onwards.
//
// The operations that getOps returns don't need to have a version: field.
// The version will be inferred from the parameters if it is missing.
//
// Callback should be called as callback(error, [list of ops]);
_getOps (collection, id, from, to, options) {
return new Promise((resolve, reject) => {
this.client.zrangebyscore(
`sharedb:ops:${collection}:${id}`,
from,
'(' + (null === to ? '+inf' : to),
(err, res) => {
if (err) return reject(err);
try {
res = res.map((raw) => { return JSON.parse(raw); });
} catch (err) {
return reject(err);
}
return resolve(res);
});
});
}
clear (collection, id) {
return new Promise((resolve, reject) => {
this.client.del(`sharedb:ops:${collection}:${id}`, (err, res) => {
if (err) return reject(err);
this.client.del(`sharedb:snapshot:${collection}:${id}`, (err, res) => {
if (err) return reject(err);
return resolve(res);
});
});
});
}
replace (collection, id, snapshot) {
return this.clear(collection, id)
.then(() => { return this._addSnapshot(collection, id, snapshot); });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment