Skip to content

Commit

Permalink
Initial implementation (#1)
Browse files Browse the repository at this point in the history
* initial commit

* add stream and batch support

* test: adding some test

* backend: make sure to slugify location for scylla table
  • Loading branch information
esatterwhite authored Feb 26, 2018
1 parent 581c20e commit 472ce38
Show file tree
Hide file tree
Showing 11 changed files with 5,142 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage/
.nyc_output/
*.save
*.tgz
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,35 @@
# skyring-scylladown
scylladb based leveldown backend for use with Skyring

> [`ScyllaDB`] backend store for [`levelup`] leveraging [`abstract-leveldown`]
[![level badge][level-badge]](https://github.com/level/awesome)
[![npm](https://img.shields.io/npm/v/@skyring-scylladown.svg?style=flat-square)](https://github.com/esatterwhite/skyring-scylladown)
[![npm](https://img.shields.io/npm/l/@skyring/scylladown.svg?style=flat-square)](https://github.com/esatterwhite/skyring-scylladown/blob/master/LICENSE)

Specialized leveldown backend for use with skyring. Not all level down features are supported.
Namely, iterator / stream ranges and explicit buffer casting - all keys and values are returned as strings

## Example

```javascript
const levelup = require('levelup')
const scylladown = require('@skyring/scylladown')

const opts = {
contactPoints: ['192.0.0.1:9042', '192.0.0.2:9042', '192.0.0.3:9042']
, keyspace: 'customkeyspace'
}

const db = levelup(scylladown('table_name'), opts)
```

### Options

* `contactPoints` - Array of scylla nodes
* `keyspace` - The scylla keyspace to operate int
* `replicas` - The number of keyspace replicas to create

[`ScyllaDB`]: https://github.com/Level/abstract-leveldown
[`abstract-leveldown`]: https://github.com/Level/levelup
[level-badge]: http://leveldb.org/img/badge.svg
[`levelup`]: https://github.com/Level/levelup
37 changes: 37 additions & 0 deletions compose/dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: "2.1"
services:
scylla-1:
image: scylladb/scylla
hostname: scylla-1
command: --broadcast-address scylla-1 --listen-address scylla-1 --overprovisioned 1 --cpuset 1,3
ports:
- 9160:9160
- 9042:9042
networks:
- scylla

scylla-2:
image: scylladb/scylla
hostname: scylla-2
command: --seeds scylla-1 --broadcast-address scylla-2 --listen-address scylla-2 --overprovisioned 1 --cpuset 2,4
networks:
- scylla
ports:
- 9043:9042
depends_on:
- scylla-1

scylla-3:
image: scylladb/scylla
hostname: scylla-3
command: --seeds scylla-1 --broadcast-address scylla-3 --listen-address scylla-3 --overprovisioned 1 --cpuset 5,6
networks:
- scylla
ports:
- 9044:9042
depends_on:
- scylla-1

networks:
scylla:
driver: bridge
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use stict'

module.exports = require('./lib')
243 changes: 243 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
'use strict'

/**
* ScyllaDB store for leveldown targeted as a backend for skyring
* @module @skyring/scylladown
* @author Eric Satterwhite
* @requires util
* @requires debug
* @requires cassandra-driver
* @requires abstract-leveldown
* @requires @skyring/scylladown/lib/iterator
**/

const {inherits, format} = require('util')
const {Client, types} = require('cassandra-driver')
const {AbstractLevelDOWN} = require('abstract-leveldown')
const ScyllaIterator = require('./iterator')
const slugify = require('./lang/string/slugify')
const debug = require('debug')('skyring:scylladown')

const ERR_NAME_NOTFOUND = 'NotFoundError'
const ERR_NOT_FOUND = 'ENOENT'
const kQuery = Symbol('queries')
const q_opts = { prepare: true }
const JSON_OBJECT = '{}'
const CREATE_KEYSPACE = `
CREATE KEYSPACE
IF NOT EXISTS %s
WITH REPLICATION = {
'class': 'SimpleStrategy'
, 'replication_factor': %d
}
`
const CREATE_TABLE = `
CREATE TABLE IF NOT EXISTS %s.%s (
id text PRIMARY KEY
, value BLOB
)
`

module.exports = ScyllaDown

/**
* ScyllaDB Leveldown backend for levelup
* @class ScyllaDown
* @extends AbstractLevelDOWN
* @alias module:@skyring/scylladown
* @params {String} location The name of a the database table
* the db instance is responsible for
**/
function ScyllaDown(location) {

if (!(this instanceof ScyllaDown)) return new ScyllaDown(location)

AbstractLevelDOWN.call(this, location)
this.keyspace = null
this.client = null
this.table = slugify(location)
this[kQuery] = {
insert: null
, update: null
, get: null
, del: null
}
}

inherits(ScyllaDown, AbstractLevelDOWN)

/**
* Called to open a connection to the scylla cluster
* @method module:@skyring/scylladown#_open
* @param {Object} [options] Additional options for scylla
* @param {String[]} [options.contactPoints=['127.0.0.1']] An array of known scylladb instances to connect to
* @param {String} [options.keyspace=skyring] Keyspace to operate under
* @param {Number} [options.replicas=1] Number of replicas per keysapce
**/
ScyllaDown.prototype._open = function _open(opts = {}, cb) {
const {
contactPoints = ['127.0.0.1']
, keyspace = 'skyring'
, replicas = 1
} = opts

debug('contact points: ', contactPoints)
debug('keyspace', keyspace)
debug('replicas', replicas)

this.client = new Client({
contactPoints: contactPoints
})

this.keyspace = keyspace

this[kQuery] = {
get: `
SELECT value FROM ${this.table}
WHERE id = ?
`
, put: `
UPDATE ${this.table}
SET value = ?
WHERE id = ?
`
, del: `
DELETE FROM ${this.table}
WHERE id = ?
`
, insert: `
INSERT INTO ${this.table} (
id, value
) VALUES (?, ?)
`
}

this.client.connect((err) => {
if (err) return cb(err)
this._keyspace(replicas, (err) => {
if (err) return cb(err)
this.client.keyspace = keyspace
return this._table((err) => {
if (err) return cb(err)
return cb(null, this)
})
})
})
}

/**
* Fetches a record by primary key
* @method module:@skyring/scylladown#_get
* @param {String} key The key to look up
* @param {Object} [options] query specific options
* @param {Function} callback function to be called when the query has finished
**/
ScyllaDown.prototype._get = function _get(key, options, cb) {
const query = this[kQuery].get
this.client.execute(query, [key], q_opts, (err, res) => {
if (err) return cb(err)
if (!res.rows.length) {
const error = new Error('Key Not Found')
error.name = ERR_NAME_NOTFOUND
error.code = ERR_NOT_FOUND
return cb(error, undefined)
}
const strigify = options.asBuffer === false
const value = res.rows[0].value
return cb(null, strigify ? value.toString('utf8') : value)
})
}

/**
* Inserts or updates a specific record
* @method module:@skyring/scylladown#_put
* @param {String} key The key to insert / operate
* @param {String} value The value to write
* @param {Object} [options] query specific options
* @param {Boolean} [options.insert=false] If true, the driver will issue an insert rather than update
* @param {Function} callback function to be called when the query has finished
**/
ScyllaDown.prototype._put = function _put(key, value, options, cb) {
if (options.insert) return this._insert(key, value, options, cb)

const _value = Buffer.isBuffer(value) ? value : Buffer.from(value)
const query = this[kQuery].put
this.client.execute(query, [_value, key], q_opts, cb)
}

ScyllaDown.prototype._insert = function _insert(key, value, options, cb) {
const query = this[kQuery].insert
const _value = Buffer.isBuffer(value) ? value : Buffer.from(value)
const values = [
key
, _value
]
debug('insert', query, values, value)
this.client.execute(query, values, q_opts, cb)
}


/**
* Removes a specific record
* @method module:@skyring/scylladown#del
* @param {String} key The key to insert / operate
* @param {Object} [options] query specific options
* @param {Boolean} [options.insert=false] If true, the driver will issue an insert rather than update
* @param {Function} callback function to be called when the query has finished
**/
ScyllaDown.prototype._del = function _del(key, options, cb) {
const query = this[kQuery].del
this.client.execute(query, [key], q_opts, cb)
}

/**
* Performs multiple updates or deletes as a single operaton
* @method module:@skyring/scylladown#_batch
* @param {Operation[]} operations to perform
* @param {Object} [options] query specific options
* @param {Function} callback function to be called when the query has finished
**/
ScyllaDown.prototype._batch = function _batch(arr, options, cb) {
const ops = arr.map((op) => {
switch(op.type) {
case 'del':
return {
query: this[kQuery].del
, params: [op.key]
}
case 'put':
return {
query: this[kQuery].put
, params: [op.value, op.key]
}
}
})

if (!ops.length) return setImmediate(cb)
this.client.batch(ops, cb)
}

ScyllaDown.prototype._iterator = function _iterator(options) {
return new ScyllaIterator(this, options)
}

ScyllaDown.prototype._keyspace = function _keyspace(replicas = 1, cb) {
const query = format(CREATE_KEYSPACE, this.keyspace, replicas)
debug('creating keyspace %s - replicas %s', this.keyspace, replicas)
this.client.execute(query, cb)
}

ScyllaDown.prototype._table = function _table(cb) {
const query = format(CREATE_TABLE, this.keyspace, this.table)
debug('creating data table', this.table)
this.client.execute(query, cb)
}

/**
* Represents a single operations inside of a batch
* @typedef {Object} Operation
* @property {String} type the operation to perform (`put`|`del`)
* @property {String} key The Key to perform the operation on
* @property {String} [value] A value for put operations
**/
51 changes: 51 additions & 0 deletions lib/iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict'

const {AbstractIterator} = require('abstract-leveldown')
const debug = require('debug')('skyring:ScyllaIterator')
const kCursor = Symbol('cursor')
const noop = () => {}

module.exports = class ScyllaIterator extends AbstractIterator {
constructor(db, options) {
super(db)
this.keyAsBuffer = options.keyAsBuffer === true
this.valueAsBuffer = options.valueAsBuffer === true
}

_next(cb) {
if (this[kCursor]) {
const item = this[kCursor].next()
debug('cursor item', item)
if(item.done) return cb()
const _key = this.keyAsBuffer ? Buffer.from(item.value.id) : item.value.id
const _value = this.valueAsBuffer ? Buffer.from(item.value.value) : item.value.value
cb(null, _key, _value)
return
}

this._cursor((err, cursor) => {
if (err) return cb(err)
const item = cursor.next()
debug('cursor item', item)
if(item.done) return cb()
const _key = this.keyAsBuffer ? Buffer.from(item.value.id) : item.value.id
const _value = this.valueAsBuffer ? Buffer.from(item.value.value) : item.value.value
cb(null, _key, _value)
})
}

_end(cb) {
this[kCursor] = null
setImmediate(cb, null)
}

_cursor(cb = noop) {
if (this[kCursor]) cb(null, this[kCursor])
const table = this.db.table
this.db.client.execute(`SELECT id, value from ${table}`, (err, results) => {
if (err) return cb(err)
this[kCursor] = results[Symbol.iterator]()
cb(null, this[kCursor])
})
}
}
14 changes: 14 additions & 0 deletions lib/lang/string/slugify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'
const NON_ALPHA_NUM_RE = /[^\w\s-]+/g
const EXTRA_HYPHEN_RE = /[\-\s]+/g

module.exports = function slugify(str) {
if (!str) return ''
const clean = str
.replace(NON_ALPHA_NUM_RE, '')
.trim()
.replace(EXTRA_HYPHEN_RE, '_')

return clean.toLowerCase()
}

Loading

0 comments on commit 472ce38

Please sign in to comment.