This repository has been archived by the owner on Mar 10, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
465 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
'use strict' | ||
|
||
const promisify = require('promisify-es6') | ||
const EventEmitter = require('events') | ||
const eos = require('end-of-stream') | ||
const isNode = require('detect-node') | ||
const PubsubMessageStream = require('../pubsub-message-stream') | ||
const stringlistToArray = require('../stringlist-to-array') | ||
|
||
const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser') | ||
|
||
/* Public API */ | ||
module.exports = (send) => { | ||
/* Internal subscriptions state and functions */ | ||
const ps = new EventEmitter() | ||
const subscriptions = {} | ||
ps.id = Math.random() | ||
return { | ||
subscribe: (topic, options, handler, callback) => { | ||
const defaultOptions = { | ||
discover: false | ||
} | ||
|
||
if (typeof options === 'function') { | ||
callback = handler | ||
handler = options | ||
options = defaultOptions | ||
} | ||
|
||
if (!options) { | ||
options = defaultOptions | ||
} | ||
|
||
// Throw an error if ran in the browsers | ||
if (!isNode) { | ||
if (!callback) { | ||
return Promise.reject(NotSupportedError()) | ||
} | ||
return callback(NotSupportedError()) | ||
} | ||
|
||
// promisify doesn't work as we always pass a | ||
// function as last argument (`handler`) | ||
if (!callback) { | ||
return new Promise((resolve, reject) => { | ||
subscribe(topic, options, handler, (err) => { | ||
if (err) { | ||
return reject(err) | ||
} | ||
resolve() | ||
}) | ||
}) | ||
} | ||
|
||
subscribe(topic, options, handler, callback) | ||
}, | ||
unsubscribe: (topic, handler) => { | ||
if (!isNode) { | ||
throw NotSupportedError() | ||
} | ||
|
||
if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { | ||
throw new Error(`Not subscribed to '${topic}'`) | ||
} | ||
|
||
ps.removeListener(topic, handler) | ||
|
||
// Drop the request once we are actualy done | ||
if (ps.listenerCount(topic) === 0) { | ||
subscriptions[topic].abort() | ||
subscriptions[topic] = null | ||
} | ||
}, | ||
publish: promisify((topic, data, callback) => { | ||
if (!isNode) { | ||
return callback(NotSupportedError()) | ||
} | ||
|
||
if (!Buffer.isBuffer(data)) { | ||
return callback(new Error('data must be a Buffer')) | ||
} | ||
|
||
const request = { | ||
path: 'pubsub/pub', | ||
args: [topic, data] | ||
} | ||
|
||
send(request, callback) | ||
}), | ||
ls: promisify((callback) => { | ||
if (!isNode) { | ||
return callback(NotSupportedError()) | ||
} | ||
|
||
const request = { | ||
path: 'pubsub/ls' | ||
} | ||
|
||
send.andTransform(request, stringlistToArray, callback) | ||
}), | ||
peers: promisify((topic, callback) => { | ||
if (!isNode) { | ||
return callback(NotSupportedError()) | ||
} | ||
|
||
const request = { | ||
path: 'pubsub/peers', | ||
args: [topic] | ||
} | ||
|
||
send.andTransform(request, stringlistToArray, callback) | ||
}), | ||
setMaxListeners (n) { | ||
return ps.setMaxListeners(n) | ||
} | ||
} | ||
|
||
function subscribe (topic, options, handler, callback) { | ||
ps.on(topic, handler) | ||
if (subscriptions[topic]) { | ||
return callback() | ||
} | ||
|
||
// Request params | ||
const request = { | ||
path: 'pubsub/sub', | ||
args: [topic], | ||
qs: { | ||
discover: options.discover | ||
} | ||
} | ||
|
||
// Start the request and transform the response | ||
// stream to Pubsub messages stream | ||
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { | ||
if (err) { | ||
subscriptions[topic] = null | ||
ps.removeListener(topic, handler) | ||
return callback(err) | ||
} | ||
|
||
stream.on('data', (msg) => { | ||
ps.emit(topic, msg) | ||
}) | ||
|
||
stream.on('error', (err) => { | ||
ps.emit('error', err) | ||
}) | ||
|
||
eos(stream, (err) => { | ||
if (err) { | ||
ps.emit('error', err) | ||
} | ||
|
||
subscriptions[topic] = null | ||
ps.removeListener(topic, handler) | ||
}) | ||
|
||
callback() | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
'use strict' | ||
|
||
const TransformStream = require('readable-stream').Transform | ||
const PubsubMessage = require('./pubsub-message-utils') | ||
|
||
class PubsubMessageStream extends TransformStream { | ||
constructor (options) { | ||
const opts = Object.assign(options || {}, { objectMode: true }) | ||
super(opts) | ||
} | ||
|
||
static from (inputStream, callback) { | ||
let outputStream = inputStream.pipe(new PubsubMessageStream()) | ||
inputStream.on('end', () => outputStream.emit('end')) | ||
callback(null, outputStream) | ||
} | ||
|
||
_transform (obj, enc, callback) { | ||
let msg | ||
try { | ||
msg = PubsubMessage.deserialize(obj, 'base64') | ||
} catch (err) { | ||
// Not a valid pubsub message | ||
// go-ipfs returns '{}' as the very first object atm, we skip that | ||
return callback() | ||
} | ||
|
||
this.push(msg) | ||
callback() | ||
} | ||
} | ||
|
||
module.exports = PubsubMessageStream |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
'use strict' | ||
|
||
const bs58 = require('bs58') | ||
|
||
module.exports = { | ||
deserialize (data, enc) { | ||
enc = enc ? enc.toLowerCase() : 'json' | ||
|
||
if (enc === 'json') { | ||
return deserializeFromJson(data) | ||
} else if (enc === 'base64') { | ||
return deserializeFromBase64(data) | ||
} | ||
|
||
throw new Error(`Unsupported encoding: '${enc}'`) | ||
} | ||
} | ||
|
||
function deserializeFromJson (data) { | ||
const json = JSON.parse(data) | ||
return deserializeFromBase64(json) | ||
} | ||
|
||
function deserializeFromBase64 (obj) { | ||
if (!isPubsubMessage(obj)) { | ||
throw new Error(`Not a pubsub message`) | ||
} | ||
|
||
return { | ||
from: bs58.encode(new Buffer(obj.from, 'base64')).toString(), | ||
seqno: new Buffer(obj.seqno, 'base64'), | ||
data: new Buffer(obj.data, 'base64'), | ||
topicCIDs: obj.topicIDs || obj.topicCIDs | ||
} | ||
} | ||
|
||
function isPubsubMessage (obj) { | ||
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* eslint-env mocha */ | ||
|
||
'use strict' | ||
|
||
const test = require('interface-ipfs-core') | ||
const FactoryClient = require('../ipfs-factory/client') | ||
const isNode = require('detect-node') | ||
|
||
if (isNode) { | ||
let fc | ||
|
||
const common = { | ||
setup: function (callback) { | ||
fc = new FactoryClient() | ||
callback(null, fc) | ||
}, | ||
teardown: function (callback) { | ||
fc.dismantle(callback) | ||
} | ||
} | ||
|
||
test.pubsub(common) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.