diff --git a/packages/ipfs-core-types/src/pubsub/index.d.ts b/packages/ipfs-core-types/src/pubsub/index.d.ts index 2fb26a06fc..8062dc9a4a 100644 --- a/packages/ipfs-core-types/src/pubsub/index.d.ts +++ b/packages/ipfs-core-types/src/pubsub/index.d.ts @@ -13,7 +13,7 @@ export interface API { * console.log(`subscribed to ${topic}`) * ``` */ - subscribe: (topic: string, handler: MessageHandlerFn, options?: AbortOptions & OptionExtension) => Promise + subscribe: (topic: string, handler: MessageHandlerFn, options?: SubscribeOptions & OptionExtension) => Promise /** * Unsubscribes from a pubsub topic @@ -81,4 +81,12 @@ export interface Message { topicIDs: string[] } +export interface SubscribeOptions extends AbortOptions { + /** + * A callback to receive an error if one occurs during processing + * subscription messages. Only supported by ipfs-http-client. + */ + onError?: (err: Error) => void +} + export type MessageHandlerFn = (message: Message) => void diff --git a/packages/ipfs-http-client/package.json b/packages/ipfs-http-client/package.json index 17eca3b0f5..c2b3ef4b5a 100644 --- a/packages/ipfs-http-client/package.json +++ b/packages/ipfs-http-client/package.json @@ -81,6 +81,7 @@ "it-concat": "^2.0.0", "it-first": "^1.0.4", "nock": "^13.0.2", + "p-defer": "^3.0.0", "rimraf": "^3.0.2" }, "engines": { diff --git a/packages/ipfs-http-client/test/commands.spec.js b/packages/ipfs-http-client/test/commands.spec.js index 9c7de185c1..c0474e8d0e 100644 --- a/packages/ipfs-http-client/test/commands.spec.js +++ b/packages/ipfs-http-client/test/commands.spec.js @@ -7,6 +7,7 @@ const f = require('./utils/factory')() describe('.commands', function () { this.timeout(60 * 1000) + /** @type {import('ipfs-core-types').IPFS} */ let ipfs before(async () => { diff --git a/packages/ipfs-http-client/test/node/agent.js b/packages/ipfs-http-client/test/node/agent.js index 00f62fc93d..fd2ddd257b 100644 --- a/packages/ipfs-http-client/test/node/agent.js +++ b/packages/ipfs-http-client/test/node/agent.js @@ -5,6 +5,11 @@ const { expect } = require('aegir/utils/chai') const ipfsClient = require('../../src').create const delay = require('delay') +/** + * @typedef {import('http').IncomingMessage} IncomingMessage + * + * @param {(message: IncomingMessage) => Promise} handler + */ function startServer (handler) { return new Promise((resolve) => { // spin up a test http server to inspect the requests made by the library @@ -20,8 +25,10 @@ function startServer (handler) { }) server.listen(0, () => { + const addressInfo = server.address() + resolve({ - port: server.address().port, + port: addressInfo && (typeof addressInfo === 'string' ? addressInfo : addressInfo.port), close: () => server.close() }) }) @@ -29,6 +36,7 @@ function startServer (handler) { } describe('agent', function () { + /** @type {import('http').Agent} */ let agent before(() => { @@ -40,6 +48,7 @@ describe('agent', function () { }) it('restricts the number of concurrent connections', async () => { + /** @type {((arg: any) => void)[]} */ const responses = [] const server = await startServer(() => { diff --git a/packages/ipfs-http-client/test/pubsub.spec.js b/packages/ipfs-http-client/test/pubsub.spec.js index 8e6320fe99..bf47216e0e 100644 --- a/packages/ipfs-http-client/test/pubsub.spec.js +++ b/packages/ipfs-http-client/test/pubsub.spec.js @@ -3,13 +3,17 @@ const { expect } = require('aegir/utils/chai') const { AbortController } = require('native-abort-controller') +const uint8ArrayFromString = require('uint8arrays/from-string') +const defer = require('p-defer') const f = require('./utils/factory')() describe('.pubsub', function () { this.timeout(20 * 1000) describe('.subscribe', () => { + /** @type {import('ipfs-core-types').IPFS} */ let ipfs + /** @type {any} */ let ctl beforeEach(async function () { @@ -27,8 +31,7 @@ describe('.pubsub', function () { it('.onError when connection is closed', async () => { const topic = 'gossipboom' let messageCount = 0 - let onError - const error = new Promise(resolve => { onError = resolve }) + const onError = defer() await ipfs.pubsub.subscribe(topic, message => { messageCount++ @@ -38,47 +41,44 @@ describe('.pubsub', function () { ctl.stop().catch() } }, { - onError + onError: onError.resolve }) - await ipfs.pubsub.publish(topic, 'hello') - await ipfs.pubsub.publish(topic, 'bye') + await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello')) + await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye')) - await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error) + await expect(onError.promise).to.eventually.be.fulfilled().and.to.be.instanceOf(Error) }) it('does not call onError when aborted', async () => { const controller = new AbortController() const topic = 'gossipabort' const messages = [] - let onError - let onReceived - - const received = new Promise(resolve => { onReceived = resolve }) - const error = new Promise(resolve => { onError = resolve }) + const onError = defer() + const onReceived = defer() await ipfs.pubsub.subscribe(topic, message => { messages.push(message) if (messages.length === 2) { - onReceived() + onReceived.resolve() } }, { - onError, + onError: onError.resolve, signal: controller.signal }) - await ipfs.pubsub.publish(topic, 'hello') - await ipfs.pubsub.publish(topic, 'bye') + await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello')) + await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye')) - await received + await onReceived.promise controller.abort() // Stop the daemon await ctl.stop() // Just to make sure no error is caused by above line - setTimeout(onError, 200, 'aborted') + setTimeout(onError.resolve, 200, 'aborted') - await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted') + await expect(onError.promise).to.eventually.be.fulfilled().and.to.equal('aborted') }) }) }) diff --git a/packages/ipfs-http-client/test/utils/factory.js b/packages/ipfs-http-client/test/utils/factory.js index e7f1096a0e..3a0bd3961b 100644 --- a/packages/ipfs-http-client/test/utils/factory.js +++ b/packages/ipfs-http-client/test/utils/factory.js @@ -1,5 +1,6 @@ 'use strict' +// @ts-ignore no types const { createFactory } = require('ipfsd-ctl') const merge = require('merge-options') const { isNode } = require('ipfs-utils/src/env') @@ -13,6 +14,7 @@ const commonOptions = { const commonOverrides = { go: { + // @ts-ignore go-ipfs has no types ipfsBin: isNode ? require('go-ipfs').path() : undefined } } diff --git a/packages/ipfs-http-client/tsconfig.json b/packages/ipfs-http-client/tsconfig.json index 3a4cc05655..195a5d1399 100644 --- a/packages/ipfs-http-client/tsconfig.json +++ b/packages/ipfs-http-client/tsconfig.json @@ -4,7 +4,9 @@ "outDir": "dist" }, "include": [ - "src" + "src", + "test/utils/factory.js", + "test/pubsub.spec.js" ], "references": [ {