-
-
Save sktbrt/7102650bfc2c673f1030dcda3c31f812 to your computer and use it in GitHub Desktop.
ShareDB Redis Database Adapter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const Db = require('sharedb').DB; | |
| const Redis = require('redis'); | |
| module.exports = class ShareDbRedis extends Db { | |
| constructor (options) { | |
| super(options); | |
| this.client = options.client || Redis.createClient(options); | |
| this.ttl = options.ttl || 60 * 60 * 24; | |
| this.closed = false; | |
| } | |
| close (callback) { | |
| this.client.quit(); | |
| this.closed = true; | |
| if (callback) return callback(); | |
| } | |
| commit (collection, id, op, snapshot, options, callback) { | |
| const script = ` | |
| local op = redis.call("zrangebyscore", KEYS[1], ARGV[1], ARGV[1]) | |
| -- If a version (score) already exists, do not duplicate it | |
| if #op ~= 0 | |
| then | |
| return redis.status_reply("Duplicated score") | |
| end | |
| -- Add the op to the collection | |
| redis.call("zadd", KEYS[1], ARGV[1], ARGV[2]) | |
| -- Refresh the expire of the ops collection | |
| redis.call("expire", KEYS[1], ARGV[4]) | |
| -- Save the snapshot | |
| redis.call("set", KEYS[2], ARGV[3], "EX", ARGV[4]) | |
| return redis.status_reply("OK") | |
| `; | |
| this.client.eval( | |
| script, // The lua script above | |
| 2, // The following 2 keys | |
| `sharedb:ops:${collection}:${id}`, // KEYS[1] Ops collection key | |
| `sharedb:snapshot:${collection}:${id}`, // KEYS[2] Snapshot key | |
| op.v || 0, // ARGV[1] The op version (special case for the create) | |
| JSON.stringify(op), // ARGV[2] The op | |
| JSON.stringify(snapshot), // ARGV[3] The snapshot | |
| this.ttl, // ARGV[4] The TTL to apply to all the keys | |
| function (err, res) { | |
| if (err) return callback(err) | |
| if (res === 'Duplicated score') { | |
| return callback(null, false) | |
| } | |
| if (res === 'OK') { | |
| callback(null, true); | |
| } else { | |
| return callback(err); | |
| } | |
| } | |
| ); | |
| } | |
| getSnapshot (collection, id, fields, options, callback) { | |
| this._getSnapshot(collection, id, fields, options) | |
| .then(function (res) { callback(null, res); }) | |
| .catch(function (err) { callback(err); }); | |
| } | |
| getOps (collection, id, from, to, options, callback) { | |
| this._getOps(collection, id, from, to, options) | |
| .then(function (res) { callback(null, res); }) | |
| .catch(function (err) { callback(err); }); | |
| } | |
| _addOps (collection, id, snapshot, op) { | |
| return new Promise((resolve, reject) => { | |
| this.client.zadd(`sharedb:ops:${collection}:${id}`, op.v || 0, JSON.stringify(op), function (err, res) { | |
| if (err) return reject(err); | |
| return resolve(res); | |
| }); | |
| }); | |
| } | |
| _expireOps (collection, id) { | |
| return new Promise((resolve, reject) => { | |
| this.client.expire(`sharedb:ops:${collection}:${id}`, this.ttl, (err, res) => { | |
| if (err) return reject(err); | |
| return resolve(res); | |
| }); | |
| }); | |
| } | |
| _addSnapshot (collection, id, snapshot) { | |
| return new Promise((resolve, reject) => { | |
| this.client.set(`sharedb:snapshot:${collection}:${id}`, JSON.stringify(snapshot), 'EX', this.ttl, (err, res) => { | |
| if (err) return reject(err); | |
| return resolve(res); | |
| }); | |
| }); | |
| } | |
| _getSnapshot (collection, id, fields, options) { | |
| return new Promise((resolve, reject) => { | |
| this.client.get(`sharedb:snapshot:${collection}:${id}`, (err, res) => { | |
| if (err) return reject(err); | |
| // an empty document must be initialized with a single \n | |
| // see https://github.com/quilljs/quill/issues/1558#issuecomment-312715578 | |
| let snapshot = { | |
| id: id, | |
| v: 0, | |
| type: null, | |
| data: [{"insert":"\n"}], | |
| m: undefined | |
| }; | |
| try { | |
| if (res) snapshot = JSON.parse(res); | |
| } catch (err) { | |
| return reject(err); | |
| } | |
| return resolve(snapshot); | |
| }); | |
| }); | |
| } | |
| // Get operations between [from, to) noninclusively. (Ie, the range should | |
| // contain start but not end). | |
| // | |
| // If end is null, this function should return all operations from start onwards. | |
| // | |
| // The operations that getOps returns don't need to have a version: field. | |
| // The version will be inferred from the parameters if it is missing. | |
| // | |
| // Callback should be called as callback(error, [list of ops]); | |
| _getOps (collection, id, from, to, options) { | |
| return new Promise((resolve, reject) => { | |
| this.client.zrangebyscore( | |
| `sharedb:ops:${collection}:${id}`, | |
| from, | |
| '(' + (null === to ? '+inf' : to), | |
| (err, res) => { | |
| if (err) return reject(err); | |
| try { | |
| res = res.map((raw) => { return JSON.parse(raw); }); | |
| } catch (err) { | |
| return reject(err); | |
| } | |
| return resolve(res); | |
| }); | |
| }); | |
| } | |
| clear (collection, id) { | |
| return new Promise((resolve, reject) => { | |
| this.client.del(`sharedb:ops:${collection}:${id}`, (err, res) => { | |
| if (err) return reject(err); | |
| this.client.del(`sharedb:snapshot:${collection}:${id}`, (err, res) => { | |
| if (err) return reject(err); | |
| return resolve(res); | |
| }); | |
| }); | |
| }); | |
| } | |
| replace (collection, id, snapshot) { | |
| return this.clear(collection, id) | |
| .then(() => { return this._addSnapshot(collection, id, snapshot); }); | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment