Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: encode record-store keys in pubsub #9

Merged
merged 2 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
"homepage": "https://github.com/ipfs/js-datastore-pubsub#readme",
"dependencies": {
"assert": "^1.4.1",
"base32.js": "~0.1.0",
"debug": "^4.1.0",
"err-code": "^1.1.2",
"interface-datastore": "~0.6.0"
"interface-datastore": "~0.6.0",
"multibase": "~0.6.0"
},
"devDependencies": {
"aegir": "^17.1.0",
Expand Down
34 changes: 20 additions & 14 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { Key } = require('interface-datastore')
const { encodeBase32 } = require('./utils')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')

const errcode = require('err-code')
const assert = require('assert')
Expand All @@ -24,19 +24,19 @@ class DatastorePubsub {
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
assert.equal(typeof validator, 'object', 'missing validator')
assert.equal(typeof validator.validate, 'function', 'missing validate function')
assert.equal(typeof validator.select, 'function', 'missing select function')
subscriptionKeyFn && assert.equal(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')
assert.strictEqual(typeof validator, 'object', 'missing validator')
assert.strictEqual(typeof validator.validate, 'function', 'missing validate function')
assert.strictEqual(typeof validator.select, 'function', 'missing select function')
subscriptionKeyFn && assert.strictEqual(typeof subscriptionKeyFn, 'function', 'invalid subscriptionKeyFn received')

this._pubsub = pubsub
this._datastore = datastore
this._peerId = peerId
this._validator = validator
this._handleSubscriptionKeyFn = subscriptionKeyFn

// Bind _handleSubscription function, which is called by pubsub.
this._handleSubscription = this._handleSubscription.bind(this)
// Bind _onMessage function, which is called by pubsub.
this._onMessage = this._onMessage.bind(this)
}

/**
Expand All @@ -61,7 +61,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

log(`publish value for topic ${stringifiedTopic}`)

Expand All @@ -83,7 +83,7 @@ class DatastorePubsub {
return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY'))
}

const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.ls((err, res) => {
if (err) {
Expand All @@ -96,7 +96,7 @@ class DatastorePubsub {
}

// Subscribe
this._pubsub.subscribe(stringifiedTopic, this._handleSubscription, (err) => {
this._pubsub.subscribe(stringifiedTopic, this._onMessage, (err) => {
if (err) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`

Expand All @@ -116,9 +116,9 @@ class DatastorePubsub {
* @returns {void}
*/
unsubscribe (key) {
const stringifiedTopic = key.toString()
const stringifiedTopic = keyToTopic(key)

this._pubsub.unsubscribe(stringifiedTopic, this._handleSubscription)
this._pubsub.unsubscribe(stringifiedTopic, this._onMessage)
}

// Get record from local datastore
Expand Down Expand Up @@ -152,9 +152,15 @@ class DatastorePubsub {
}

// handles pubsub subscription messages
_handleSubscription (msg) {
_onMessage (msg) {
const { data, from, topicIDs } = msg
const key = topicIDs[0]
let key
try {
key = topicToKey(topicIDs[0])
} catch (err) {
log.error(err)
return
}

log(`message received for ${key} topic`)

Expand Down
29 changes: 26 additions & 3 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
'use strict'

const base32 = require('base32.js')
const multibase = require('multibase')
const errcode = require('err-code')

const namespace = '/record/'
const base64urlCode = 'u' // base64url code from multibase

module.exports.encodeBase32 = (buf) => {
const enc = new base32.Encoder()
return enc.write(buf).finalize()
return multibase.encode('base32', buf).slice(1) // slice off multibase codec
}

// converts a binary record key to a pubsub topic key.
module.exports.keyToTopic = (key) => {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
// Encodes to "/record/base64url(key)"
const b64url = multibase.encode('base64url', key).slice(1).toString()

return `${namespace}${b64url}`
}

// converts a pubsub topic key to a binary record key.
module.exports.topicToKey = (topic) => {
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved
if (topic.substring(0, namespace.length) !== namespace) {
throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE')
}

const key = `${base64urlCode}${topic.substring(namespace.length)}`

return multibase.decode(key).toString()
}
61 changes: 32 additions & 29 deletions test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { Key } = require('interface-datastore')
const { Record } = require('libp2p-record')

const DatastorePubsub = require('../src')
const { keyToTopic } = require('../src/utils')
const { connect, waitFor, waitForPeerToSubscribe, spawnDaemon, stopDaemon } = require('./utils')

// Always returning the expected values
Expand Down Expand Up @@ -115,11 +116,12 @@ describe('datastore-pubsub', function () {

it('should subscribe the topic, but receive error as no entry is stored locally', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed key reference yet
expect(res).to.not.include(subsTopic) // not subscribed key reference yet

dsPubsubA.get(key, (err) => {
expect(err).to.exist() // not locally stored record
Expand All @@ -128,7 +130,7 @@ describe('datastore-pubsub', function () {
pubsubA.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.include(`/${keyRef}`) // subscribed key reference
expect(res).to.include(subsTopic) // subscribed key reference
done()
})
})
Expand All @@ -138,11 +140,12 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should not receive it without subscribing', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const subsTopic = keyToTopic(`/${keyRef}`)

pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(`/${keyRef}`) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubA.put(key, serializedRecord, (err) => {
expect(err).to.not.exist()
Expand All @@ -169,7 +172,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -181,9 +184,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // no value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -200,7 +203,7 @@ describe('datastore-pubsub', function () {
it('should put correctly to daemon A and daemon B should receive it as it tried to get it first and subscribed it', function (done) {
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -210,16 +213,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -300,7 +303,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -312,9 +315,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -345,7 +348,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -357,9 +360,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down Expand Up @@ -396,7 +399,7 @@ describe('datastore-pubsub', function () {

const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -408,9 +411,9 @@ describe('datastore-pubsub', function () {
expect(res).to.not.exist() // not value available, but it is subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -431,14 +434,14 @@ describe('datastore-pubsub', function () {
})
})

it('should subscribe the topic and after a message being received, discarde it using the subscriptionKeyFn', function (done) {
it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', function (done) {
const subscriptionKeyFn = (topic, callback) => {
expect(topic).to.equal(key.toString())
expect(topic).to.equal(`/${keyRef}`)
callback(new Error('DISCARD MESSAGE'))
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
let receivedMessage = false

function messageHandler () {
Expand All @@ -448,16 +451,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand All @@ -478,7 +481,7 @@ describe('datastore-pubsub', function () {
}
const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator)
const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn)
const topic = `/${keyRef}`
const subsTopic = keyToTopic(`/${keyRef}`)
const keyNew = Buffer.from(`${key.toString()}new`)
let receivedMessage = false

Expand All @@ -489,16 +492,16 @@ describe('datastore-pubsub', function () {
pubsubB.ls((err, res) => {
expect(err).to.not.exist()
expect(res).to.exist()
expect(res).to.not.include(topic) // not subscribed
expect(res).to.not.include(subsTopic) // not subscribed

dsPubsubB.get(key, (err, res) => {
expect(err).to.exist()
expect(res).to.not.exist() // not value available, but subscribed now

series([
(cb) => waitForPeerToSubscribe(topic, ipfsdBId, ipfsdA, cb),
(cb) => waitForPeerToSubscribe(subsTopic, ipfsdBId, ipfsdA, cb),
// subscribe in order to understand when the message arrive to the node
(cb) => pubsubB.subscribe(topic, messageHandler, cb),
(cb) => pubsubB.subscribe(subsTopic, messageHandler, cb),
(cb) => dsPubsubA.put(key, serializedRecord, cb),
// wait until message arrives
(cb) => waitFor(() => receivedMessage === true, cb),
Expand Down