diff --git a/lib/common/ReadableStream.ts b/lib/common/ReadableStream.ts index 0c00c12fc..3bd8819ec 100644 --- a/lib/common/ReadableStream.ts +++ b/lib/common/ReadableStream.ts @@ -1,22 +1,43 @@ +import SidetreeError from './SidetreeError'; +import ErrorCode from '../common/SharedErrorCode'; + /** * ReadableStream utilities */ export default class ReadableStream { /** - * Given a readable stream, reads all data until end or error - * @param stream Fetch readable stream to read + * Given a readable stream, reads all data only if the content does not exceed given max size. + * Throws error if content exceeds give max size. + * @param stream Readable stream to read. + * @param maxAllowedSizeInBytes The maximum allowed size limit of the content. * @returns a Buffer of the readable stream data */ - public static async readAll (stream: NodeJS.ReadableStream): Promise { + public static async readAll (stream: NodeJS.ReadableStream, maxAllowedSizeInBytes?: number): Promise { // Set callback for the 'readable' event to concatenate chunks of the readable stream. let content: Buffer = Buffer.alloc(0); + let currentSizeInBytes = 0; stream.on('readable', () => { // NOTE: Cast to any is to work-around incorrect TS definition for read() where // `null` should be a possible return type but is not defined in @types/node: 10.12.18. let chunk = stream.read() as any; while (chunk !== null) { + currentSizeInBytes += chunk.length; + + // Monitor on read size only if `maxAllowedSizeInBytes` is set. + if (maxAllowedSizeInBytes !== undefined && + currentSizeInBytes > maxAllowedSizeInBytes) { + + const error = new SidetreeError( + ErrorCode.ReadableStreamMaxAllowedDataSizeExceeded, + `Max data size allowed: ${maxAllowedSizeInBytes} bytes, aborted reading at ${currentSizeInBytes} bytes.` + ); + + // NOTE: Cast to any is to work-around incorrect TS definition where `destroy()` is missing. + (stream as any).destroy(error); + } + content = Buffer.concat([content, chunk]); chunk = stream.read(); } diff --git a/lib/common/SharedErrorCode.ts b/lib/common/SharedErrorCode.ts index cf89490ef..3adfc2d1f 100644 --- a/lib/common/SharedErrorCode.ts +++ b/lib/common/SharedErrorCode.ts @@ -4,7 +4,8 @@ export default { BlockchainTimeOutOfRange: 'blockchain_time_out_of_range', InvalidTransactionNumberOrTimeHash: 'invalid_transaction_number_or_time_hash', - NotEnoughBalanceForWrite: 'not_enough_balace_for_write', + NotEnoughBalanceForWrite: 'not_enough_balance_for_write', + ReadableStreamMaxAllowedDataSizeExceeded: 'readable_stream_max_allowed_data_size_exceeded', SpendingCapPerPeriodReached: 'spending_cap_per_period_reached', ValueTimeLockNotFound: 'value_time_lock_not_found' }; diff --git a/lib/common/enums/FetchResultCode.ts b/lib/common/enums/FetchResultCode.ts index 4c2a563f5..5171234d3 100644 --- a/lib/common/enums/FetchResultCode.ts +++ b/lib/common/enums/FetchResultCode.ts @@ -5,7 +5,6 @@ enum FetchResultCode { CasNotReachable = 'cas_not_reachable', InvalidHash = 'content_hash_invalid', MaxSizeExceeded = 'content_exceeds_maximum_allowed_size', - MaxSizeNotSpecified = 'content_max_size_not_specified', NotAFile = 'content_not_a_file', NotFound = 'content_not_found', Success = 'success' diff --git a/lib/core/Cas.ts b/lib/core/Cas.ts deleted file mode 100644 index f4e871170..000000000 --- a/lib/core/Cas.ts +++ /dev/null @@ -1,94 +0,0 @@ -import * as HttpStatus from 'http-status'; -import FetchResult from '../common/models/FetchResult'; -import FetchResultCode from '../common/enums/FetchResultCode'; -import ICas from './interfaces/ICas'; -import nodeFetch from 'node-fetch'; -import ReadableStream from '../common/ReadableStream'; -import ServiceVersionFetcher from './ServiceVersionFetcher'; -import ServiceVersionModel from '../common/models/ServiceVersionModel'; - -/** - * Class that communicates with the underlying CAS using REST API defined by the protocol document. - */ -export default class Cas implements ICas { - - private fetch = nodeFetch; - private serviceVersionFetcher: ServiceVersionFetcher; - - public constructor (public uri: string) { - this.serviceVersionFetcher = new ServiceVersionFetcher(uri); - } - - public async write (content: Buffer): Promise { - const requestParameters = { - method: 'post', - body: content, - headers: { 'Content-Type': 'application/octet-stream' } - }; - const response = await this.fetch(this.uri, requestParameters); - if (response.status !== HttpStatus.OK) { - console.error(`CAS write error response status: ${response.status}`); - - if (response.body) { - const errorBody = await ReadableStream.readAll(response.body); - console.error(`CAS write error body: ${errorBody}`); - } - - throw new Error('Encountered an error writing content to CAS.'); - } - - const body = await ReadableStream.readAll(response.body); - const hash = JSON.parse(body.toString()).hash; - - return hash; - } - - public async read (address: string, maxSizeInBytes: number): Promise { - try { - // Fetch the resource. - const queryUri = `${this.uri}/${address}?max-size=${maxSizeInBytes}`; - const response = await this.fetch(queryUri); - if (response.status === HttpStatus.NOT_FOUND) { - return { code: FetchResultCode.NotFound }; - } - - if (response.status === HttpStatus.BAD_REQUEST) { - const errorBody = await ReadableStream.readAll(response.body); - return JSON.parse(errorBody.toString()); - } - - if (response.status !== HttpStatus.OK) { - console.error(`CAS '${address}' read response status: ${response.status}`); - - if (response.body) { - const errorBody = await ReadableStream.readAll(response.body); - console.error(`CAS '${address}' read error body: ${errorBody}`); - } - - console.error(`Treating '${address}' read as not-found, but should investigate.`); - return { code: FetchResultCode.NotFound }; - } - - const content = await ReadableStream.readAll(response.body); - - return { - code: FetchResultCode.Success, - content: content - }; - } catch (error) { - if (error.code === 'ECONNREFUSED') { - return { code: FetchResultCode.CasNotReachable }; - } - - // Else throw - throw error; - } - } - - /** - * Gets the service version. - */ - public async getServiceVersion (): Promise { - return this.serviceVersionFetcher.getVersion(); - } -} diff --git a/lib/core/Core.ts b/lib/core/Core.ts index 01a0f60e2..3cb21e898 100644 --- a/lib/core/Core.ts +++ b/lib/core/Core.ts @@ -1,8 +1,8 @@ import BatchScheduler from './BatchScheduler'; import Blockchain from './Blockchain'; -import Cas from './Cas'; -import DownloadManager from './DownloadManager'; import Config from './models/Config'; +import DownloadManager from './DownloadManager'; +import ICas from './interfaces/ICas'; import MongoDbOperationStore from './MongoDbOperationStore'; import MongoDbTransactionStore from '../common/MongoDbTransactionStore'; import MongoDbUnresolvableTransactionStore from './MongoDbUnresolvableTransactionStore'; @@ -23,7 +23,6 @@ export default class Core { private operationStore: MongoDbOperationStore; private versionManager: VersionManager; private blockchain: Blockchain; - private cas: Cas; private downloadManager: DownloadManager; private observer: Observer; private batchScheduler: BatchScheduler; @@ -33,12 +32,11 @@ export default class Core { /** * Core constructor. */ - public constructor (config: Config, versionModels: VersionModel[]) { + public constructor (config: Config, versionModels: VersionModel[], private cas: ICas) { // Component dependency construction & injection. this.versionManager = new VersionManager(config, versionModels); // `VersionManager` is first constructed component. this.operationStore = new MongoDbOperationStore(config.mongoDbConnectionString, config.databaseName); this.blockchain = new Blockchain(config.blockchainServiceUri); - this.cas = new Cas(config.contentAddressableStoreServiceUri); this.downloadManager = new DownloadManager(config.maxConcurrentDownloads, this.cas); this.resolver = new Resolver(this.versionManager, this.operationStore); this.batchScheduler = new BatchScheduler(this.versionManager, this.blockchain, config.batchingIntervalInSeconds); @@ -112,8 +110,7 @@ export default class Core { public async handleGetVersionRequest (): Promise { const responses = [ this.serviceInfo.getServiceVersion(), - await this.blockchain.getServiceVersion(), - await this.cas.getServiceVersion() + await this.blockchain.getServiceVersion() ]; return { diff --git a/lib/core/models/Config.ts b/lib/core/models/Config.ts index c1117fc13..122641c44 100644 --- a/lib/core/models/Config.ts +++ b/lib/core/models/Config.ts @@ -4,7 +4,6 @@ export default interface Config { batchingIntervalInSeconds: number; blockchainServiceUri: string; - contentAddressableStoreServiceUri: string; didMethodName: string; maxConcurrentDownloads: number; observingIntervalInSeconds: number; diff --git a/lib/index.ts b/lib/index.ts index dd9ce82ed..bf7cb8428 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,6 +1,7 @@ // NOTE: Aliases to classes and interfaces are used for external consumption. // Core service exports. +import ISidetreeCas from './core/interfaces/ICas'; import SidetreeCore from './core/Core'; import SidetreeConfig from './core/models/Config'; import SidetreeResponse from './common/Response'; @@ -8,6 +9,7 @@ import SidetreeResponseModel from './common/models/ResponseModel'; import SidetreeVersionModel from './common/models/VersionModel'; export { + ISidetreeCas, SidetreeConfig, SidetreeCore, SidetreeResponse, @@ -25,10 +27,3 @@ export { ISidetreeBitcoinWallet, SidetreeBitcoinProcessor }; - -// IPFS service exports. -import SidetreeIpfsService from './ipfs/RequestHandler'; - -export { - SidetreeIpfsService -}; diff --git a/lib/ipfs/@types/ipfs/index.d.ts b/lib/ipfs/@types/ipfs/index.d.ts deleted file mode 100644 index 93ee206e2..000000000 --- a/lib/ipfs/@types/ipfs/index.d.ts +++ /dev/null @@ -1,267 +0,0 @@ -declare module 'ipfs' { - export = IPFS; -} - -type Callback = (error: Error, result?: T) => void; - -declare class IPFS { - constructor (options?: IPFS.Options); - - types: IPFS.Types; - - init (options: IPFS.InitOptions, callback: Callback): void; - init (callback: Callback): void; - - preStart (callback: Callback): void; - start (callback?: Callback): void; - stop (callback?: (error?: Error) => void): Promise; - isOnline (): boolean; - - version (options: any, callback: (error: Error, version: IPFS.Version) => void): void ; - version (options?: any): Promise; - version (callback: (error: Error, version: IPFS.Version) => void): void ; - - id (options: any, callback: (error: Error, version: IPFS.Id) => void): void ; - id (options?: any): Promise; - id (callback: (error: Error, version: IPFS.Id) => void): void ; - - repo: IPFS.RepoAPI; - bootstrap: any; - config: any; - block: any; - object: IPFS.ObjectAPI; - dag: IPFS.DagAPI; - libp2p: any; - swarm: IPFS.SwarmAPI; - bitswap: any; - pin: IPFS.PinAPI; - - ping (callback: (error: Error) => void): void; - ping (): Promise; - - pubsub: any; - - on (event: string, callback: () => void): IPFS; - on (event: 'error', callback: (error: { message: any }) => void): IPFS; - once (event: string, callback: () => void): IPFS; - - add (data: IPFS.FileContent, options?: any): AsyncIterator; - - cat (hash: IPFS.Multihash, options?: any): AsyncIterator; - - get (hash: IPFS.Multihash): AsyncIterator; -} - -declare namespace IPFS { - - export interface Options { - init?: boolean; - start?: boolean; - EXPERIMENTAL?: any; - repo?: any; - config?: any; - } - - export interface InitOptions { - emptyRepo?: boolean; - bits?: number; - log?: Function; - } - - export interface Multiaddr { - buffer: Uint8Array; - } - - export type Multihash = any | string; - export type CID = any; - - export interface Types { - Buffer: any; - PeerId: any; - PeerInfo: any; - multiaddr: Multiaddr; - multihash: Multihash; - CID: CID; - } - - export interface Version { - version: string; - repo: string; - commit: string; - } - - export interface Id { - id: string; - publicKey: string; - addresses: Multiaddr[]; - agentVersion: string; - protocolVersion: string; - } - - export interface RepoAPI { - init (bits: number, empty: boolean, callback: Callback): void; - - version (options: any, callback: Callback): void; - version (callback: Callback): void; - - gc (): void; - path (): string; - } - - export type FileContent = Buffer | NodeJS.ReadableStream | Files[]; - - export interface Files { - path: string; - content?: Buffer | NodeJS.ReadableStream; - } - - export interface IPFSFile { - path: string; - hash: string; - size: number; - content?: FileContent; - } - - export interface PinAPI { - add (hash: Multihash, options: any, callback: Callback): void; - add (hash: Multihash, options: any): Promise; - add (hash: Multihash, callback: Callback): void; - add (hash: Multihash): Promise; - - ls (): void; - rm (): void; - } - - export interface PeersOptions { - v?: boolean; - verbose?: boolean; - } - - export type PeerId = any; - - export interface PeerInfo { - id: PeerId; - multiaddr: Multiaddr; - multiaddrs: Multiaddr[]; - distinctMultiaddr (): Multiaddr[]; - } - - export interface Peer { - addr: Multiaddr; - peer: PeerInfo; - } - - export interface SwarmAPI { - peers (options: PeersOptions, callback: Callback): void; - peers (options: PeersOptions): Promise; - peers (callback: Callback): void; - peers (): Promise; - - addrs (callback: Callback): void; - addrs (): Promise; - - localAddrs (callback: Callback): void; - localAddrs (): Promise; - - connect (maddr: Multiaddr | string, callback: Callback): void; - connect (maddr: Multiaddr | string): Promise; - - disconnect (maddr: Multiaddr | string, callback: Callback): void; - disconnect (maddr: Multiaddr | string): Promise; - - filters (callback: Callback): never; - } - - export type DAGNode = any; - export type DAGLink = any; - export type DAGLinkRef = DAGLink | any; - export type Obj = Buffer | Object | DAGNode; - - export interface Object { - Data: Buffer; - Link: DAGLink; - } - - export interface ObjectStat { - Hash: Multihash; - NumLinks: number; - BlockSize: number; - LinksSize: number; - DataSize: number; - CumulativeSize: number; - } - - export interface PutObjectOptions { - enc?: any; - } - - export interface GetObjectOptions { - enc?: any; - } - - export interface ObjectPatchAPI { - addLink (multihash: Multihash, link: DAGLink, options: GetObjectOptions, callback: Callback): void; - addLink (multihash: Multihash, link: DAGLink, options?: GetObjectOptions): Promise; - addLink (multihash: Multihash, link: DAGLink, callback: Callback): void; - - rmLink (multihash: Multihash, linkRef: DAGLinkRef, options: GetObjectOptions, callback: Callback): void; - rmLink (multihash: Multihash, linkRef: DAGLinkRef, options?: GetObjectOptions): Promise; - rmLink (multihash: Multihash, linkRef: DAGLinkRef, callback: Callback): void; - - appendData (multihash: Multihash, data: any, options: GetObjectOptions, callback: Callback): void; - appendData (multihash: Multihash, data: any, options?: GetObjectOptions): Promise; - appendData (multihash: Multihash, data: any, callback: Callback): void; - - setData (multihash: Multihash, data: any, options: GetObjectOptions, callback: Callback): void; - setData (multihash: Multihash, data: any, options?: GetObjectOptions): Promise; - setData (multihash: Multihash, data: any, callback: Callback): void; - } - - export interface ObjectAPI { - 'new' (template: 'unixfs-dir', callback: Callback): void; - 'new' (callback: Callback): void; - 'new' (): Promise; - - put (obj: Obj, options: PutObjectOptions, callback: Callback): void; - put (obj: Obj, options?: PutObjectOptions): Promise; - put (obj: Obj, callback: Callback): void; - - get (multihash: Multihash, options: GetObjectOptions, callback: Callback): void; - get (multihash: Multihash, options?: GetObjectOptions): Promise; - get (multihash: Multihash, callback: Callback): void; - - data (multihash: Multihash, options: GetObjectOptions, callback: Callback): void; - data (multihash: Multihash, options?: GetObjectOptions): Promise; - data (multihash: Multihash, callback: Callback): void; - - links (multihash: Multihash, options: GetObjectOptions, callback: Callback): void; - links (multihash: Multihash, options?: GetObjectOptions): Promise; - links (multihash: Multihash, callback: Callback): void; - - stat (multihash: Multihash, options: GetObjectOptions, callback: Callback): void; - stat (multihash: Multihash, options: GetObjectOptions): Promise; - stat (multihash: Multihash, callback: Callback): void; - stat (multihash: Multihash): Promise; - - patch: ObjectPatchAPI; - } - - export interface DagAPI { - put (dagNode: any, options: any, callback: Callback): void; - put (dagNode: any, options: any): Promise; - - get (cid: string | CID, path: string, options: any, callback: Callback): void; - get (cid: string | CID, path?: string, options?: any): Promise; - get (cid: string | CID, path: string, callback: Callback): void; - get (cid: string | CID, callback: Callback): void; - - tree (cid: string | CID, path: string, options: any, callback: Callback): void; - tree (cid: string | CID, path?: string, options?: any): Promise; - tree (cid: string | CID, path: string, callback: Callback): void; - tree (cid: string | CID, options: any, callback: Callback): void; - tree (cid: string | CID, options: any): Promise; - tree (cid: string | CID, callback: Callback): void; - } - - export function create (options: Options): Promise; -} diff --git a/lib/ipfs/Ipfs.ts b/lib/ipfs/Ipfs.ts new file mode 100644 index 000000000..7638643df --- /dev/null +++ b/lib/ipfs/Ipfs.ts @@ -0,0 +1,175 @@ +import * as crypto from 'crypto'; +import * as HttpStatus from 'http-status'; +import * as url from 'url'; +import base64url from 'base64url'; +import FetchResult from '../common/models/FetchResult'; +import FetchResultCode from '../common/enums/FetchResultCode'; +import ICas from '../core/interfaces/ICas'; +import IpfsErrorCode from '../ipfs/IpfsErrorCode'; +import nodeFetch from 'node-fetch'; +import ReadableStream from '../common/ReadableStream'; +import SharedErrorCode from '../common/SharedErrorCode'; +import SidetreeError from '../common/SidetreeError'; +import Timeout from './Util/Timeout'; + +const multihashes = require('multihashes'); + +/** + * Class that implements the `ICas` interface by communicating with IPFS. + */ +export default class Ipfs implements ICas { + private fetch = nodeFetch; + + public constructor (private uri: string, private fetchTimeoutInSeconds: number) { } + + public async write (content: Buffer): Promise { + // A string that is cryptographically impossible to repeat as the boundary string. + const multipartBoundaryString = crypto.randomBytes(32).toString('hex'); + + // See https://tools.ietf.org/html/rfc7578#section-4.1 + // An example of multipart form data: + // + // --ABoundaryString + // + // Content of the first part. + // --ABoundaryString + // Content-Type: application/octet-stream + // + // Binary content of second part. + // --ABoundaryString-- + const beginBoundary = Buffer.from(`--${multipartBoundaryString}\n`); + const firstPartContentType = Buffer.from(`Content-Type: application/octet-stream\n\n`); + const endBoundary = Buffer.from(`\n--${multipartBoundaryString}--`); + const requestBody = Buffer.concat([beginBoundary, firstPartContentType, content, endBoundary]); + + const requestParameters = { + method: 'POST', + headers: { 'Content-Type': `multipart/form-data; boundary=${multipartBoundaryString}` }, + body: requestBody + }; + + const addUrl = url.resolve(this.uri, '/api/v0/add'); // e.g. 'http://127.0.0.1:5001/api/v0/add' + const response = await this.fetch(addUrl, requestParameters); + + if (response.status !== HttpStatus.OK) { + console.error(`IPFS write error response status: ${response.status}`); + + if (response.body) { + const errorBody = await ReadableStream.readAll(response.body); + console.error(`IPFS write error body: ${errorBody}`); + } + + throw new SidetreeError(IpfsErrorCode.IpfsFailedWritingContent, `Failed writing content of ${content.length} bytes.`); + } + + const body = await ReadableStream.readAll(response.body); + const base58EncodedMultihashString = JSON.parse(body.toString()).Hash; + + // Convert base58 to base64url multihash. + const multihashBuffer = multihashes.fromB58String(base58EncodedMultihashString); + const base64urlEncodedMultihash = base64url.encode(multihashBuffer); + + console.log(`Wrote ${content.length} byte content as IPFS CID: ${base58EncodedMultihashString}, base64url ID: ${base64urlEncodedMultihash}`); + return base64urlEncodedMultihash; + } + + public async read (base64urlEncodedMultihash: string, maxSizeInBytes: number): Promise { + // Convert base64url to base58 multihash. + let base58EncodedMultihashString; + try { + const multihashBuffer = base64url.toBuffer(base64urlEncodedMultihash); + multihashes.validate(multihashBuffer); + base58EncodedMultihashString = multihashes.toB58String(multihashBuffer); + } catch (error) { + console.log(`'${base64urlEncodedMultihash}' is not a valid hash: ${SidetreeError.stringify(error)}`); + return { code: FetchResultCode.InvalidHash }; + } + + // Fetch the content. + let fetchResult; + try { + const fetchContentPromise = this.fetchContent(base58EncodedMultihashString, maxSizeInBytes); + fetchResult = await Timeout.timeout(fetchContentPromise, this.fetchTimeoutInSeconds * 1000); + } catch (error) { + // Log appropriately based on error. + if (error.code === IpfsErrorCode.TimeoutPromiseTimedOut) { + console.log(`Timed out fetching CID '${base58EncodedMultihashString}', base64url ID: ${base64urlEncodedMultihash}.`); + } else { + // Log any unexpected error for investigation. + const errorMessage = + `Unexpected error while fetching CID '${base58EncodedMultihashString}', base64url ID: ${base64urlEncodedMultihash}. ` + + `Investigate and fix: ${SidetreeError.stringify(error)}`; + console.error(errorMessage); + } + + // Mark content as `not found` if any error is thrown while fetching. + return { code: FetchResultCode.NotFound }; + } + + // "Pin" (store permanently in local repo) content if fetch is successful. Re-pinning already existing object does not create a duplicate. + if (fetchResult.code === FetchResultCode.Success) { + await this.pinContent(base58EncodedMultihashString); + console.log(`Read and pinned ${fetchResult.content!.length} bytes for CID: ${base58EncodedMultihashString}, base64url ID: ${base64urlEncodedMultihash}.`); + } + + return fetchResult; + } + + /** + * Fetch the content from IPFS. + * This method also allows easy mocking in tests. + */ + private async fetchContent (base58Multihash: string, maxSizeInBytes: number): Promise { + // Go-IPFS HTTP API call. + let response; + try { + // e.g. 'http://127.0.0.1:5001/api/v0/cat?arg=QmPPsg8BeJdqK2TnRHx5L2BFyjmFr9FK6giyznNjdL93NL&length=100000' + // NOTE: we pass max size + 1 to the API because the API will return up to the size given, + // so if we give the exact max size, we would not know when the content of the exact max size is returned, + // whether the content is truncated or not; with the +1, if the content returned has size of max size + 1, + // we can safely discard the content (in the stream read below) and return size exceeded as the fetch result. + // Alternatively, we could choose not to supply this optional `length` parameter, but we do so such that + // IPFS is given the opportunity to optimize its download logic. (e.g. not needing to download the entire content). + const catUrl = url.resolve(this.uri, `/api/v0/cat?arg=${base58Multihash}&length=${maxSizeInBytes + 1}`); + response = await this.fetch(catUrl, { method: 'POST' }); + } catch (error) { + if (error.code === 'ECONNREFUSED') { + return { code: FetchResultCode.CasNotReachable }; + } + + throw error; + } + + // Handle non-OK response. + if (response.status !== HttpStatus.OK) { + const body = await ReadableStream.readAll(response.body); + const json = JSON.parse(body.toString()); + + if (json.Message === 'this dag node is a directory') { + return { code: FetchResultCode.NotAFile }; + } + + console.info(`Received response code ${response.status} from IPFS for CID ${base58Multihash}: ${json})}`); + return { code: FetchResultCode.NotFound }; + } + + // Handle OK response. + const fetchResult: FetchResult = { code: FetchResultCode.Success }; + try { + fetchResult.content = await ReadableStream.readAll(response.body, maxSizeInBytes); + return fetchResult; + } catch (error) { + if (error.code === SharedErrorCode.ReadableStreamMaxAllowedDataSizeExceeded) { + return { code: FetchResultCode.MaxSizeExceeded }; + } + + throw error; + } + } + + private async pinContent (hash: string) { + // e.g. 'http://127.0.0.1:5001/api/v0/pin?arg=QmPPsg8BeJdqK2TnRHx5L2BFyjmFr9FK6giyznNjdL93NL' + const pinUrl = url.resolve(this.uri, `/api/v0/pin?arg=${hash}`); + await this.fetch(pinUrl, { method: 'POST' }); + } +} diff --git a/lib/ipfs/IpfsErrorCode.ts b/lib/ipfs/IpfsErrorCode.ts new file mode 100644 index 000000000..28f78d155 --- /dev/null +++ b/lib/ipfs/IpfsErrorCode.ts @@ -0,0 +1,7 @@ +/** + * Common error codes used across services. + */ +export default { + IpfsFailedWritingContent: 'ipfs_failed_writing_content', + TimeoutPromiseTimedOut: 'timeout_promise_timed_out' +}; diff --git a/lib/ipfs/IpfsStorage.ts b/lib/ipfs/IpfsStorage.ts deleted file mode 100644 index d326d8fec..000000000 --- a/lib/ipfs/IpfsStorage.ts +++ /dev/null @@ -1,181 +0,0 @@ -import * as IPFS from 'ipfs'; -import FetchResult from '../common/models/FetchResult'; -import FetchResultCode from '../common/enums/FetchResultCode'; - -/** - * Class that implements the IPFS Storage functionality. - */ -export default class IpfsStorage { - - /** IPFS node instance */ - private node: IPFS | undefined; - private repo: any; - private healthy: boolean; - private healthCheckInternalInSeconds: number; - - /** - * the constructor itself is not functional, the initialize function needs to be called to be healthy - * @param repo the repo to store ipfs data in - */ - public constructor (repo?: any) { - this.repo = repo; - this.healthy = false; // need to initialize to be healthy - this.healthCheckInternalInSeconds = 60; - } - - private async getNode (): Promise { - const localRepoName = 'sidetree-ipfs'; - const options = { - repo: this.repo !== undefined ? this.repo : localRepoName - }; - const node = await IPFS.create(options); - return node; - } - - /** - * Start periodic health check and start up ipfs node - */ - public initialize () { - setImmediate(async () => this.healthCheck()); - } - - private async healthCheck () { - try { - if (!this.healthy) { - console.log('Unhealthy, restarting IPFS node...'); - await this.restart(); - this.healthy = true; - } - } catch (e) { - console.error(`unknown error thrown by healthCheck: ${e}`); - } finally { - setTimeout(async () => this.healthCheck(), this.healthCheckInternalInSeconds * 1000); - } - } - - /** - * restarts the IPFS node - */ - private async restart () { - if (this.node !== undefined) { - await this.node.stop(); - console.log('old node stopped, starting a new one'); - } - this.node = await this.getNode(); - } - - /** - * Reads the stored content of the content identifier. - * @param hash Content identifier to fetch the content. - * @param maxSizeInBytes The maximum allowed size limit of the content. - * @returns The fetch result containg the content buffer if found. - * The result `code` is set to `FetchResultCode.NotFound` if the content is not found. - * The result `code` is set to `FetchResultCode.MaxSizeExceeded` if the content exceeds the specified max size. - * The result `code` is set to `FetchResultCode.NotAFile` if the content being downloaded is not a file (e.g. a directory). - */ - public async read (hash: string, maxSizeInBytes: number): Promise { - try { - const fetchResult = await this.fetchContent(hash, maxSizeInBytes); - - // "Pin" (store permanently in local repo) content if fetch is successful. Re-pinning already existing object does not create a duplicate. - if (fetchResult.code === FetchResultCode.Success) { - await this.node!.pin.add(hash); - } - return fetchResult; - } catch { - this.healthy = false; - return { - code: FetchResultCode.CasNotReachable - }; - } - } - - /** - * Fetch the content from IPFS. - * This method also allows easy mocking in tests. - */ - private async fetchContent (hash: string, maxSizeInBytes: number): Promise { - - const fetchResult: FetchResult = { code: FetchResultCode.Success }; - let bufferChunks: Buffer[] = []; - let currentContentSize = 0; - let iterator: AsyncIterator; - try { - iterator = this.node!.cat(hash); - } catch (e) { - // when an error is thrown, certain error message denote that the CID is not a file, anything else is unexpected error from ipfs - console.debug(`Error thrown while downloading content from IPFS for CID ${hash}: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`); - if (IpfsStorage.isIpfsErrorNotAFileError(e.message)) { - return { code: FetchResultCode.NotAFile }; - } else { - return { code: FetchResultCode.NotFound }; - } - } - - let result: IteratorResult; - try { - do { - result = await iterator.next(); - // the linter cannot detect that result.value can be undefined, so we disable it. The code should still compile - /* tslint:disable-next-line */ - if (result.value !== undefined) { - const chunk = result.value; - currentContentSize += chunk.byteLength; - if (maxSizeInBytes < currentContentSize) { - console.info(`Max size of ${maxSizeInBytes} bytes exceeded by CID ${hash}`); - return { code: FetchResultCode.MaxSizeExceeded }; - } - bufferChunks.push(chunk); - } - // done will always be true if it is the last element. When it is not, it can be false or undefined, which in js !undefined === true - } while (!result.done); - } catch (e) { - console.error(`unexpected error thrown for CID ${hash}, please investigate and fix: ${JSON.stringify(e, Object.getOwnPropertyNames(e))}`); - throw e; - } - - if (bufferChunks.length === 0) { - return { code: FetchResultCode.NotFound }; - } - - fetchResult.content = Buffer.concat(bufferChunks); - - return fetchResult; - } - - /** - * Writes the passed content to the IPFS storage. - * @param content Sidetree content to write to IPFS storage. - * @returns The multihash content identifier of the stored content. - */ - public async write (content: Buffer): Promise { - try { - const file = await this.node!.add(content).next(); - return file.value.cid.toString(); - } catch (e) { - console.log(`Error thrown while writing: ${e}`); - this.healthy = false; - return undefined; - } - } - - /** - * Stops this IPFS store. - */ - public async stop () { - if (this.node !== undefined) { - await this.node.stop(); - } - } - - /** - * Checks if a certain error message corresponds to the not a file error from ipfs - * @param errorText the error text that matches the ipfs implementation of not a file error - */ - private static isIpfsErrorNotAFileError (errorText: string) { - // a set of error texts ipfs use to denote not a file - const notAFileErrorTextSet = new Set(['this dag node is a directory', 'this dag node has no content']); - return notAFileErrorTextSet.has(errorText); - } - -} diff --git a/lib/ipfs/README.md b/lib/ipfs/README.md deleted file mode 100644 index c5f1015a0..000000000 --- a/lib/ipfs/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# sidetree-ipfs -IPFS module for storing and accessing Sidetree entity operation data via content addressable storage - -[![Build status](https://decentralized-identity.visualstudio.com/Core/_apis/build/status/sidetree-ipfs)](https://decentralized-identity.visualstudio.com/Core/_build/latest?definitionId=13) diff --git a/lib/ipfs/RequestHandler.ts b/lib/ipfs/RequestHandler.ts deleted file mode 100644 index 7eb3d3b0d..000000000 --- a/lib/ipfs/RequestHandler.ts +++ /dev/null @@ -1,145 +0,0 @@ -import base64url from 'base64url'; -import FetchResultCode from '../common/enums/FetchResultCode'; -import IpfsStorage from './IpfsStorage'; -import ResponseModel from '../common/models/ResponseModel'; -import ResponseStatus from '../common/enums/ResponseStatus'; -import ServiceInfo from '../common/ServiceInfoProvider'; -import Timeout from './Util/Timeout'; - -const multihashes = require('multihashes'); - -/** - * Sidetree IPFS request handler class. - */ -export default class RequestHandler { - /** - * Instance of IpfsStorage. - */ - public ipfsStorage: IpfsStorage; - private serviceInfo: ServiceInfo; - - /** - * Creates and return an instance of Sidetree IPFS request handler. - * @param fetchTimeoutInSeconds Timeout for fetch request. Fetch request will return `not-found` when timed-out. - * @param repo Optional IPFS datastore implementation. - */ - public static create (fetchTimeoutInSeconds: number, repo?: any): RequestHandler { - return new RequestHandler(fetchTimeoutInSeconds, repo); - } - - private constructor (private fetchTimeoutInSeconds: number, repo?: any) { - this.ipfsStorage = new IpfsStorage(repo); - this.ipfsStorage.initialize(); - this.serviceInfo = new ServiceInfo('ipfs'); - } - - /** - * Handles read request - * @param base64urlEncodedMultihash Content Identifier Hash. - */ - public async handleFetchRequest (base64urlEncodedMultihash: string, maxSizeInBytes?: number): Promise { - console.log(`Handling fetch request for '${base64urlEncodedMultihash}'...`); - - if (maxSizeInBytes === undefined) { - return { - status: ResponseStatus.BadRequest, - body: { code: FetchResultCode.MaxSizeNotSpecified } - }; - } - - const multihashBuffer = base64url.toBuffer(base64urlEncodedMultihash); - try { - multihashes.validate(multihashBuffer); - } catch { - return { - status: ResponseStatus.BadRequest, - body: { code: FetchResultCode.InvalidHash } - }; - } - - try { - const base58EncodedMultihashString = multihashes.toB58String(multihashBuffer); - const fetchPromise = this.ipfsStorage.read(base58EncodedMultihashString, maxSizeInBytes); - - const fetchResult = await Timeout.timeout(fetchPromise, this.fetchTimeoutInSeconds * 1000); - - // Return not-found if fetch timed. - if (fetchResult instanceof Error) { - console.warn(`'${base64urlEncodedMultihash}' not found on IPFS.`); - return { status: ResponseStatus.NotFound }; - } - - if (fetchResult.code === FetchResultCode.MaxSizeExceeded || - fetchResult.code === FetchResultCode.NotAFile) { - return { - status: ResponseStatus.BadRequest, - body: { code: fetchResult.code } - }; - } - - if (fetchResult.code === FetchResultCode.NotFound) { - return { - status: ResponseStatus.NotFound - }; - } - - // Else fetch was successful. - console.log(`Fetched '${base64urlEncodedMultihash}' of size ${fetchResult.content!.length} bytes.`); - return { - status: ResponseStatus.Succeeded, - body: fetchResult.content - }; - } catch (error) { - console.error(`Hit unexpected error fetching '${base64urlEncodedMultihash}, investigate and fix: ${error}`); - return { - status: ResponseStatus.ServerError, - body: error.message - }; - } - } - - /** - * Handles sidetree content write request - * @param content Sidetree content to write into CAS storage - */ - public async handleWriteRequest (content: Buffer): Promise { - console.log(`Writing content of ${content.length} bytes...`); - - let base64urlEncodedMultihash; - try { - const base58EncodedMultihashString = await this.ipfsStorage.write(content); - if (base58EncodedMultihashString === undefined) { - return { - status: ResponseStatus.ServerError, - body: 'ipfs write failed' - }; - } - const multihashBuffer = multihashes.fromB58String(base58EncodedMultihashString); - base64urlEncodedMultihash = base64url.encode(multihashBuffer); - - console.info(`Wrote content '${base64urlEncodedMultihash}'.`); - return { - status: ResponseStatus.Succeeded, - body: { hash: base64urlEncodedMultihash } - }; - } catch (err) { - console.error(`Hit unexpected error writing '${base64urlEncodedMultihash}, investigate and fix: ${err}`); - return { - status: ResponseStatus.ServerError, - body: err.message - }; - } - } - - /** - * Handles the get version request. - */ - public async handleGetVersionRequest (): Promise { - const body = JSON.stringify(this.serviceInfo.getServiceVersion()); - - return { - status : ResponseStatus.Succeeded, - body : body - }; - } -} diff --git a/lib/ipfs/Util/Timeout.ts b/lib/ipfs/Util/Timeout.ts index 2ab054cec..23637b292 100644 --- a/lib/ipfs/Util/Timeout.ts +++ b/lib/ipfs/Util/Timeout.ts @@ -1,3 +1,6 @@ +import IpfsErrorCode from '../IpfsErrorCode'; +import SidetreeError from '../../common/SidetreeError'; + /** * Class containing code execution timeout/timing utilities. */ @@ -6,16 +9,19 @@ export default class Timeout { /** * Monitors the given promise to see if it runs to completion within the specified timeout duration. * @param task Promise to apply a timeout to. - * @returns The given promise if it completed execution within the timeout duration, a promise containing an Error otherwise. + * @returns The value that the task returns if the task completed execution within the timeout duration. + * @throws `TimeoutPromiseTimedOut` Error task timed out. Rethrows the error that the given task throws. */ - public static async timeout (task: Promise, timeoutInMilliseconds: number): Promise { - const timeoutPromise = new Promise((resolve, _reject) => { - setTimeout(() => { - resolve(new Error(`The given promised timed out after ${timeoutInMilliseconds} milliseconds.`)); - }, timeoutInMilliseconds); + public static async timeout (task: Promise, timeoutInMilliseconds: number): Promise { + const timeoutPromise = new Promise((_resolve, reject) => { + setTimeout( + () => { reject(new SidetreeError(IpfsErrorCode.TimeoutPromiseTimedOut, `Promise timed out after ${timeoutInMilliseconds} milliseconds.`)); }, + timeoutInMilliseconds + ); }); const content = await Promise.race([task, timeoutPromise]); + return content; } } diff --git a/package.json b/package.json index 6a1f494c4..65ceace2d 100644 --- a/package.json +++ b/package.json @@ -81,6 +81,7 @@ "lib/**" ], "exclude": [ + "lib/bitcoin/versions/[0-9]**/**", "lib/core/versions/[0-9]**/**", "lib/core/versions/**/VersionMetadata.ts", "lib/**/**ErrorCode.ts" diff --git a/tests/common/ReadableStream.spec.ts b/tests/common/ReadableStream.spec.ts index b289aa78e..a2dd40283 100644 --- a/tests/common/ReadableStream.spec.ts +++ b/tests/common/ReadableStream.spec.ts @@ -1,5 +1,7 @@ import * as fs from 'fs'; +import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator'; import ReadableStream from '../../lib/common/ReadableStream'; +import SharedErrorCode from '../../lib/common/SharedErrorCode'; describe('ReadableStream', () => { it('should read all content using readAll().', async () => { @@ -20,4 +22,17 @@ describe('ReadableStream', () => { expect(content).toEqual(expectedContent); }); + + it('should throw error if stream exceeds the max allowed size.', async (done) => { + const inputFilePath = './tests/bitcoin/testData/bitcoinTwoBlocksRawDataHex.txt'; + const stream = fs.createReadStream(inputFilePath); + const maxAllowedContentSize = 100; + + await JasmineSidetreeErrorValidator.expectSidetreeErrorToBeThrownAsync( + () => ReadableStream.readAll(stream, maxAllowedContentSize), + SharedErrorCode.ReadableStreamMaxAllowedDataSizeExceeded + ); + + done(); + }); }); diff --git a/tests/core/Blockchain.spec.ts b/tests/core/Blockchain.spec.ts index d4da692f4..c60ebb65a 100644 --- a/tests/core/Blockchain.spec.ts +++ b/tests/core/Blockchain.spec.ts @@ -361,14 +361,14 @@ describe('Blockchain', async () => { }; const fetchSpy = spyOn(blockchainClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse)); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(mockFetchResponse.body))); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(mockFetchResponse.body))); - const identifierInput = 'indentifier input'; + const identifierInput = 'identifier input'; const actual = await blockchainClient.getValueTimeLock(identifierInput); expect(actual).toEqual(mockValueTimeLock); expect(fetchSpy).toHaveBeenCalledWith(`${blockchainClient['locksUri']}/${identifierInput}`); - expect(readStreamSpy).toHaveBeenCalledWith(mockFetchResponse.body); + expect(readAllSpy).toHaveBeenCalledWith(mockFetchResponse.body); done(); }); @@ -427,13 +427,13 @@ describe('Blockchain', async () => { }; const fetchSpy = spyOn(blockchainClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse)); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(mockFetchResponse.body))); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(mockFetchResponse.body))); const actual = await blockchainClient.getWriterValueTimeLock(); expect(actual).toEqual(mockValueTimeLock); expect(fetchSpy).toHaveBeenCalledWith(`${blockchainClient['writerLockUri']}`); - expect(readStreamSpy).toHaveBeenCalledWith(mockFetchResponse.body); + expect(readAllSpy).toHaveBeenCalledWith(mockFetchResponse.body); done(); }); diff --git a/tests/core/Cas.spec.ts b/tests/core/Cas.spec.ts deleted file mode 100644 index 8a426bc44..000000000 --- a/tests/core/Cas.spec.ts +++ /dev/null @@ -1,95 +0,0 @@ -import Cas from '../../lib/core/Cas'; -import FetchResultCode from '../../lib/common/enums/FetchResultCode'; -import ReadableStream from '../../lib/common/ReadableStream'; -import ServiceVersionModel from '../../lib/common/models/ServiceVersionModel'; - -describe('Cas', async () => { - it('should return file hash of the content written.', async () => { - const casClient = new Cas('unused'); - const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 200, body: 'unused' })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('{"hash":"abc"}'))); - const hash = await casClient.write(Buffer.from('unused')); - - expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); - expect(hash).toEqual('abc'); - }); - - it('should throw if content writing returned with an error.', async () => { - const casClient = new Cas('unused'); - spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 500, body: 'unused' })); - spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('abc'))); - - try { - await casClient.write(Buffer.from('unused')); - } catch { - // Throwing error is the expected case. - return; - } - - fail(); - }); - - it('should set fetch result as not-found when fetch result in an unexpected error.', async () => { - const casClient = new Cas('unused'); - const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 200, body: 'unused' })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('abc'))); - const fetchResult = await casClient.read('anyAddress', 1); - - expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); - expect(fetchResult.code).toEqual(FetchResultCode.Success); - expect(fetchResult.content!.toString()).toEqual('abc'); - }); - - it('should set fetch result as not-found when fetch result in an unexpected error.', async () => { - const casClient = new Cas('unused'); - const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 500, body: 'unused' })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify({ - code: 'unused' - })))); - const fetchResult = await casClient.read('anyAddress', 1); - - expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); - expect(fetchResult.code).toEqual(FetchResultCode.NotFound); - }); - - it('should set fetch result correctly when fetch responds with a not-found.', async () => { - const casClient = new Cas('unused'); - const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 404 })); - - const fetchResult = await casClient.read('anyAddress', 1); - - expect(fetchSpy).toHaveBeenCalled(); - expect(fetchResult.code).toEqual(FetchResultCode.NotFound); - }); - - it('should set fetch result correctly when fetch responds with a bad-request.', async () => { - const casClient = new Cas('unused'); - const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 400 })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify({ - code: FetchResultCode.InvalidHash - })))); - - const fetchResult = await casClient.read('anyAddress', 1); - - expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); - expect(fetchResult.code).toEqual(FetchResultCode.InvalidHash); - }); - - describe('getServiceVersion', async () => { - it('should get the version from the service version fetcher', async () => { - const casClient = new Cas('unused'); - const expectedServiceVersion: ServiceVersionModel = { name: 'test-service', version: 'x.y.z' }; - - const serviceVersionSpy = spyOn(casClient['serviceVersionFetcher'], 'getVersion').and.returnValue(Promise.resolve(expectedServiceVersion)); - - const fetchedServiceVersion = await casClient.getServiceVersion(); - - expect(serviceVersionSpy).toHaveBeenCalled(); - expect(fetchedServiceVersion).toEqual(expectedServiceVersion); - }); - }); -}); diff --git a/tests/core/Core.spec.ts b/tests/core/Core.spec.ts index 688c202aa..6f82e6623 100644 --- a/tests/core/Core.spec.ts +++ b/tests/core/Core.spec.ts @@ -1,5 +1,6 @@ import Core from '../../lib/core/Core'; import IRequestHandler from '../../lib/core/interfaces/IRequestHandler'; +import MockCas from '../mocks/MockCas'; import ResponseModel from '../../lib/common/models/ResponseModel'; import ResponseStatus from '../../lib/common/enums/ResponseStatus'; import ServiceVersionModel from '../../lib/common/models/ServiceVersionModel'; @@ -9,6 +10,8 @@ describe('Core', async () => { const testConfig = require('../json/bitcoin-config-test.json'); const testVersionConfig = require('../json/core-protocol-versioning-test.json'); + const mockCas = new MockCas(); + const resolvedRequest: Promise = new Promise(resolve => { const responseModel: ResponseModel = { status: ResponseStatus.Succeeded, body: null }; resolve(responseModel); @@ -19,7 +22,7 @@ describe('Core', async () => { // remove the optional parameter "databaseName" const minimalConfig = Object.assign({}, testConfig); delete minimalConfig.databaseName; - const core = new Core(minimalConfig, testVersionConfig); + const core = new Core(minimalConfig, testVersionConfig, mockCas); expect(core).toBeDefined(); // test default database name expect(core['operationStore']['databaseName']).toEqual('sidetree'); @@ -28,7 +31,7 @@ describe('Core', async () => { it('should construct MongoDBOperationStore with database if passed in config', () => { const databaseName = 'mongoDbTestDatabase'; const databaseIncludedConfig = Object.assign({}, testConfig, { databaseName }); - const core = new Core(databaseIncludedConfig, testVersionConfig); + const core = new Core(databaseIncludedConfig, testVersionConfig, mockCas); expect(core['operationStore']['databaseName']).toEqual(databaseName); expect(core['transactionStore']['databaseName']).toEqual(databaseName); expect(core['unresolvableTransactionStore']['databaseName']).toEqual(databaseName); @@ -37,7 +40,7 @@ describe('Core', async () => { describe('initialize', async () => { it('should initialize all required dependencies', async () => { - const core = new Core(testConfig, testVersionConfig); + const core = new Core(testConfig, testVersionConfig, mockCas); const transactionStoreInitSpy = spyOn(core['transactionStore'], 'initialize'); const unresolvableTransactionStoreInitSpy = spyOn(core['unresolvableTransactionStore'], 'initialize'); const operationStoreInitSpy = spyOn(core['operationStore'], 'initialize'); @@ -68,19 +71,16 @@ describe('Core', async () => { // the values alphabetically to validate the response later on. const expectedCoreVersion: ServiceVersionModel = { name: 'a-service', version: 'x.y.z' }; const expectedBlockchainVersion: ServiceVersionModel = { name: 'b-service', version: 'a.b.c' }; - const expectedCasVersion: ServiceVersionModel = { name: 'c-service', version: '1.x.c' }; - const core = new Core(testConfig, testVersionConfig); + const core = new Core(testConfig, testVersionConfig, mockCas); const serviceInfoSpy = spyOn(core['serviceInfo'], 'getServiceVersion').and.returnValue(expectedCoreVersion); const blockchainSpy = spyOn(core['blockchain'], 'getServiceVersion').and.returnValue(Promise.resolve(expectedBlockchainVersion)); - const casSpy = spyOn(core['cas'], 'getServiceVersion').and.returnValue(Promise.resolve(expectedCasVersion)); const fetchedResponse = await core.handleGetVersionRequest(); expect(serviceInfoSpy).toHaveBeenCalled(); expect(blockchainSpy).toHaveBeenCalled(); - expect(casSpy).toHaveBeenCalled(); expect(fetchedResponse.status).toEqual(ResponseStatus.Succeeded); // Sort the output to make it easier to validate @@ -89,13 +89,12 @@ describe('Core', async () => { expect(fetchedVersions[0]).toEqual(expectedCoreVersion); expect(fetchedVersions[1]).toEqual(expectedBlockchainVersion); - expect(fetchedVersions[2]).toEqual(expectedCasVersion); }); }); describe('handleResolveRequest', () => { it('should call the needed functions and return a response', async () => { - const core = new Core(testConfig, testVersionConfig); + const core = new Core(testConfig, testVersionConfig, mockCas); const mockRequestHandler = jasmine.createSpyObj('versionManagerSpy', ['handleResolveRequest']); mockRequestHandler.handleResolveRequest.and.callFake(() => { return resolvedRequest; }); core['versionManager']['getRequestHandler'] = () => { return mockRequestHandler; }; @@ -108,7 +107,7 @@ describe('Core', async () => { describe('handleOperationRequest', () => { it('should call the needed functions and return a response', async () => { - const core = new Core(testConfig, testVersionConfig); + const core = new Core(testConfig, testVersionConfig, mockCas); const mockRequestHandler = jasmine.createSpyObj('versionManagerSpy', ['handleOperationRequest']); mockRequestHandler.handleOperationRequest.and.callFake(() => { return resolvedRequest; }); core['versionManager']['getRequestHandler'] = () => { return mockRequestHandler; }; diff --git a/tests/core/Observer.spec.ts b/tests/core/Observer.spec.ts index b368e387d..11479ee84 100644 --- a/tests/core/Observer.spec.ts +++ b/tests/core/Observer.spec.ts @@ -2,7 +2,6 @@ import * as retry from 'async-retry'; import AnchoredDataSerializer from '../../lib/core/versions/latest/AnchoredDataSerializer'; import AnchorFile from '../../lib/core/versions/latest/AnchorFile'; import Blockchain from '../../lib/core/Blockchain'; -import Cas from '../../lib/core/Cas'; import ChunkFile from '../../lib/core/versions/latest/ChunkFile'; import DownloadManager from '../../lib/core/DownloadManager'; import Encoder from '../../lib/core/versions/latest/Encoder'; @@ -10,6 +9,7 @@ import ErrorCode from '../../lib/common/SharedErrorCode'; import FetchResult from '../../lib/common/models/FetchResult'; import FetchResultCode from '../../lib/common/enums/FetchResultCode'; import IOperationStore from '../../lib/core/interfaces/IOperationStore'; +import Ipfs from '../../lib/ipfs/Ipfs'; import IVersionManager from '../../lib/core/interfaces/IVersionManager'; import MockBlockchain from '../mocks/MockBlockchain'; import MapFile from '../../lib/core/versions/latest/MapFile'; @@ -39,7 +39,8 @@ describe('Observer', async () => { beforeAll(async () => { jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000; // These asynchronous tests can take a bit longer than normal. - casClient = new Cas(config.contentAddressableStoreServiceUri); + const fetchTimeoutInSeconds = 1; + casClient = new Ipfs('unusedUri', fetchTimeoutInSeconds); // Setting the CAS to always return 404. spyOn(casClient, 'read').and.returnValue(Promise.resolve({ code: FetchResultCode.NotFound })); diff --git a/tests/core/ServiceVersionFetcher.spec.ts b/tests/core/ServiceVersionFetcher.spec.ts index 6fe5531b9..1c144159c 100644 --- a/tests/core/ServiceVersionFetcher.spec.ts +++ b/tests/core/ServiceVersionFetcher.spec.ts @@ -10,12 +10,12 @@ describe('ServiceVersionFetcher', async () => { const serviceVersionFetcher = new ServiceVersionFetcher('someURI'); const fetchSpy = spyOn(serviceVersionFetcher as any, 'fetch').and.returnValue(Promise.resolve({ status: 200 })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify(expectedServiceVersion)))); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify(expectedServiceVersion)))); const version = await serviceVersionFetcher.getVersion(); expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); expect(version).toEqual(expectedServiceVersion); }); @@ -54,13 +54,13 @@ describe('ServiceVersionFetcher', async () => { const serviceVersionFetcher = new ServiceVersionFetcher('someURI'); const fetchSpy = spyOn(serviceVersionFetcher as any, 'fetch').and.returnValue(Promise.resolve({ status: 200 })); - const readStreamSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify(expectedServiceVersion)))); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify(expectedServiceVersion)))); const tryGetServiceVersionSpy = spyOn(serviceVersionFetcher as any, 'tryGetServiceVersion').and.callThrough(); await serviceVersionFetcher.getVersion(); expect(fetchSpy).toHaveBeenCalled(); - expect(readStreamSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); expect(tryGetServiceVersionSpy).toHaveBeenCalled(); // Update the last fetch time to ensure another network call diff --git a/tests/core/TransactionProcessor.spec.ts b/tests/core/TransactionProcessor.spec.ts index 922e65d93..fe5034095 100644 --- a/tests/core/TransactionProcessor.spec.ts +++ b/tests/core/TransactionProcessor.spec.ts @@ -1,6 +1,5 @@ import AnchoredDataSerializer from '../../lib/core/versions/latest/AnchoredDataSerializer'; import AnchorFile from '../../lib/core/versions/latest/AnchorFile'; -import Cas from '../../lib/core/Cas'; import ChunkFile from '../../lib/core/versions/latest/ChunkFile'; import Compressor from '../../lib/core/versions/latest/util/Compressor'; import DownloadManager from '../../lib/core/DownloadManager'; @@ -8,6 +7,7 @@ import ErrorCode from '../../lib/core/versions/latest/ErrorCode'; import FetchResult from '../../lib/common/models/FetchResult'; import FetchResultCode from '../../lib/common/enums/FetchResultCode'; import IBlockchain from '../../lib/core/interfaces/IBlockchain'; +import Ipfs from '../../lib/ipfs/Ipfs'; import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator'; import MapFile from '../../lib/core/versions/latest/MapFile'; import MockBlockchain from '../mocks/MockBlockchain'; @@ -19,9 +19,9 @@ import TransactionProcessor from '../../lib/core/versions/latest/TransactionProc import ValueTimeLockModel from '../../lib/common/models/ValueTimeLockModel'; import ValueTimeLockVerifier from '../../lib/core/versions/latest/ValueTimeLockVerifier'; -describe('TransactionProcessor', () => { - const config = require('../json/config-test.json'); - let casClient: Cas; +describe('TransactionProcessor', async () => { + const config = await import('../json/config-test.json'); + let casClient: Ipfs; let operationStore: MockOperationStore; let downloadManager: DownloadManager; let blockchain: IBlockchain; @@ -37,7 +37,8 @@ describe('TransactionProcessor', () => { }; beforeEach(() => { - casClient = new Cas(config.contentAddressableStoreServiceUri); + const fetchTimeoutInSeconds = 1; + casClient = new Ipfs('unusedUri', fetchTimeoutInSeconds); operationStore = new MockOperationStore(); downloadManager = new DownloadManager(config.maxConcurrentDownloads, casClient); downloadManager.start(); @@ -45,7 +46,7 @@ describe('TransactionProcessor', () => { transactionProcessor = new TransactionProcessor(downloadManager, operationStore, blockchain, versionMetadataFetcher); }); - describe('prcoessTransaction', () => { + describe('processTransaction', () => { it('should ignore error and return true when AnchoredDataSerializer throws a sidetree error', async () => { const anchoredData = 'Bad Format'; const mockTransaction: TransactionModel = { diff --git a/tests/ipfs/Ipfs.spec.ts b/tests/ipfs/Ipfs.spec.ts new file mode 100644 index 000000000..2b76108f1 --- /dev/null +++ b/tests/ipfs/Ipfs.spec.ts @@ -0,0 +1,162 @@ +import FetchResultCode from '../../lib/common/enums/FetchResultCode'; +import ReadableStream from '../../lib/common/ReadableStream'; +import ICas from '../../lib/core/interfaces/ICas'; +import Ipfs from '../../lib/ipfs/Ipfs'; +import IpfsErrorCode from '../../lib/ipfs/IpfsErrorCode'; +import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator'; +import SharedErrorCode from '../../lib/common/SharedErrorCode'; +import SidetreeError from '../../lib/common/SidetreeError'; +import Timeout from '../../lib/ipfs/Util/Timeout'; + +describe('Ipfs', async () => { + let casClient: ICas; + + beforeEach(() => { + const fetchTimeoutInSeconds = 1; + casClient = new Ipfs('unused', fetchTimeoutInSeconds); + }); + + describe('read()', async () => { + it('should return file hash of the content written.', async () => { + const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 200, body: 'unused' })); + const readAllSpy = spyOn(ReadableStream, 'readAll') + .and.returnValue(Promise.resolve(Buffer.from(JSON.stringify({ Hash: 'QmWCcaE2iTRnJxqaC4VGFhD6ARsqRNPe2D2eYJTWgeP7ko' })))); + const hash = await casClient.write(Buffer.from('unused')); + + expect(fetchSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); + expect(hash).toEqual('EiB0zm8TToaK5Z97V43iIwfJJzgx25SgMOhLwOerD3KgJA'); // hash here is based64 encoded string. `readAll()` returns base58 encoded string. + }); + + it('should throw if content writing IPFS HTTP API returned a non-OK status with or without body', async () => { + spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 500, body: 'unused' })); + spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('abc'))); + + await JasmineSidetreeErrorValidator.expectSidetreeErrorToBeThrownAsync( + () => casClient.write(Buffer.from('unused')), + IpfsErrorCode.IpfsFailedWritingContent + ); + }); + + it('should throw if content writing IPFS HTTP API returned a non-OK status without body', async () => { + spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 500 })); + spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('abc'))); + + await JasmineSidetreeErrorValidator.expectSidetreeErrorToBeThrownAsync( + () => casClient.write(Buffer.from('unused')), + IpfsErrorCode.IpfsFailedWritingContent + ); + }); + }); + + describe('read()', async () => { + it('should set fetch result as success when fetch is successful.', async () => { + const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 200, body: 'unused' })); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from('abc'))); + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02w', 1); + + expect(fetchSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.Success); + expect(fetchResult.content!.toString()).toEqual('abc'); + }); + + it('should set fetch result as not-found when IPFS HTTP API returns non OK status.', async () => { + const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve({ status: 500, body: 'unused' })); + const readAllSpy = spyOn(ReadableStream, 'readAll').and.returnValue(Promise.resolve(Buffer.from(JSON.stringify({ + code: 'unused' + })))); + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02w', 1); + + expect(fetchSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.NotFound); + }); + + it('should set fetch result as not-found when `timeout()` throws an unexpected error.', async () => { + const fetchContentSpy = spyOn(casClient as any, 'fetchContent'); + const timeoutSpy = spyOn(Timeout, 'timeout').and.throwError('any unexpected error'); + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02w', 1); + + expect(fetchContentSpy).toHaveBeenCalled(); + expect(timeoutSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.NotFound); + }); + + it('should set fetch result as not-found when `timeout()` throws a timeout error.', async () => { + const fetchContentSpy = spyOn(casClient as any, 'fetchContent'); + const timeoutSpy = spyOn(Timeout, 'timeout').and.callFake(() => { throw new SidetreeError(IpfsErrorCode.TimeoutPromiseTimedOut); }); + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02X', 1); + + expect(fetchContentSpy).toHaveBeenCalled(); + expect(timeoutSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.NotFound); + }); + + it('should set fetch result correctly when given hash is invalid.', async () => { + const fetchResult = await casClient.read('anyInvalidHash', 1); + expect(fetchResult.code).toEqual(FetchResultCode.InvalidHash); + }); + + it('should return correct fetch result code if IPFS service is not reachable.', async () => { + // Simulate IPFS not reachable. + const fetchContentSpy = spyOn(casClient as any, 'fetch').and.callFake(() => { + const error = new Error('any error message'); + (error as any).code = 'ECONNREFUSED'; + throw error; + }); + const fetchResult = await casClient.read('EiBIRxuYXzo1wChnyefwXx5TCSIBKjvHDi9eG20iDzp_Vw', 1); + + expect(fetchContentSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.CasNotReachable); + }); + + it('should return as content not found if `fetch()` throws unexpected error.', async () => { + // Simulate IPFS not reachable. + const fetchContentSpy = spyOn(casClient as any, 'fetch').and.throwError('any unexpected error'); + const fetchResult = await casClient.read('EiBIRxuYXzo1wChnyefwXx5TCSIBKjvHDi9eG20iDzp_Vw', 1); + + expect(fetchContentSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.NotFound); + }); + + it('should return as content not found if unexpected error occurred while reading the content stream.', async () => { + const mockFetchResponse = { status: 200 }; + spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse)); + + spyOn(ReadableStream, 'readAll').and.throwError('any unexpected error'); + + const fetchResult = await casClient.read('EiBIRxuYXzo1wChnyefwXx5TCSIBKjvHDi9eG20iDzp_Vw', 1); + expect(fetchResult.code).toEqual(FetchResultCode.NotFound); + }); + + it('should return correct fetch result code if content found is not a file.', async () => { + const mockFetchResponse = { status: 500 }; + const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse)); + + const readAllSpy = spyOn(ReadableStream, 'readAll') + .and.returnValue(Promise.resolve(Buffer.from(JSON.stringify({ Message: 'this dag node is a directory' })))); + + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02w', 1); + + expect(fetchSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.NotAFile); + }); + + it('should return correct fetch result code if content max size is exceeded.', async () => { + const mockFetchResponse = { status: 200 }; + const fetchSpy = spyOn(casClient as any, 'fetch').and.returnValue(Promise.resolve(mockFetchResponse)); + + const readAllSpy = spyOn(ReadableStream, 'readAll').and.callFake(() => { + throw new SidetreeError(SharedErrorCode.ReadableStreamMaxAllowedDataSizeExceeded); + }); + + const fetchResult = await casClient.read('EiCGEBPkUOwS6vKY0NXkrhSFj1obfNhlWfFcIUFhczR02w', 1); + + expect(fetchSpy).toHaveBeenCalled(); + expect(readAllSpy).toHaveBeenCalled(); + expect(fetchResult.code).toEqual(FetchResultCode.MaxSizeExceeded); + }); + }); +}); diff --git a/tests/ipfs/IpfsStorage.spec.ts b/tests/ipfs/IpfsStorage.spec.ts deleted file mode 100644 index 5e97eaad2..000000000 --- a/tests/ipfs/IpfsStorage.spec.ts +++ /dev/null @@ -1,204 +0,0 @@ -import * as IPFS from 'ipfs'; -import FetchResultCode from '../../lib/common/enums/FetchResultCode'; -import IpfsStorage from '../../lib/ipfs/IpfsStorage'; -import MockAsyncIterable from '../mocks/MockAsyncIterable'; -import { randomBytes } from 'crypto'; - -describe('IpfsStorage', () => { - let ipfsStorage: IpfsStorage; - let maxFileSize: number; - let createSpy: any; - let nodeMock: any; - - beforeAll(async () => { - - ipfsStorage = new IpfsStorage(); - nodeMock = { - add: () => { return; }, - cat: () => { return; }, - pin: { - add: () => { return; } - }, - stop: () => { return; } - } as any; - - createSpy = spyOn(IPFS, 'create').and.returnValue(nodeMock); - // ipfsStorage.initialize(); - ipfsStorage['node'] = nodeMock; - - maxFileSize = 20000000; // 20MB - }); - - describe('read', () => { - it('should return the pinned content for the given hash.', async () => { - const mockContent = Buffer.from('ipfs'); - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - spyOn(ipfsStorage['node']!, 'cat').and.returnValue(new MockAsyncIterable(mockContent, mockContent, 1)); - - const expectedContent = mockContent; - - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedContent).toEqual(fetchedContent.content!); - }); - - it('should return not a file if cat throws is a directory error', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - spyOn(ipfsStorage['node']!, 'cat').and.throwError('this dag node is a directory'); - - const expectedErrorCode = FetchResultCode.NotAFile; - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedErrorCode).toEqual(fetchedContent.code); - }); - - it('should return not a file if cat throws no content error', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - spyOn(ipfsStorage['node']!, 'cat').and.throwError('this dag node has no content'); - - const expectedErrorCode = FetchResultCode.NotAFile; - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedErrorCode).toEqual(fetchedContent.code); - }); - - it('should return not found for any other unexpected error', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - spyOn(ipfsStorage['node']!, 'cat').and.throwError('some unexpected error'); - - const expectedErrorCode = FetchResultCode.NotFound; - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedErrorCode).toEqual(fetchedContent.code); - }); - - it('should return error code when cat.next throws an unexpected error', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - - const mockIterable = new MockAsyncIterable(); - mockIterable.next = () => { - throw new Error('A test error thrown by unit test'); - }; - - spyOn(ipfsStorage['node']!, 'cat').and.returnValue(mockIterable); - const result = await ipfsStorage.read('abc123', maxFileSize); - expect(result).toEqual({ code: FetchResultCode.CasNotReachable }); - }); - - it('should return size exceeded if content size exceeds maxFileSize during download.', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - - // this creates a buffer with size 1 bigger than max allowed - const mockCatValue = randomBytes(maxFileSize + 1); - - spyOn(ipfsStorage['node']!, 'cat').and.returnValue(new MockAsyncIterable(mockCatValue, mockCatValue)); - - const expectedErrorCode = FetchResultCode.MaxSizeExceeded; - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedErrorCode).toEqual(fetchedContent.code); - }); - - it('should return not found if cat next result is undefined', async () => { - spyOn(ipfsStorage['node']!.pin, 'add').and.returnValue(Promise.resolve([true])); - - const mockIterator = new MockAsyncIterable(undefined, undefined); - spyOn(ipfsStorage['node']!, 'cat').and.returnValue(mockIterator); - - const expectedErrorCode = FetchResultCode.NotFound; - const fetchedContent = await ipfsStorage.read('abc123', maxFileSize); - expect(expectedErrorCode).toEqual(fetchedContent.code); - }); - }); - - describe('write', () => { - it('should write the content to IPFS and return the multihash.', async () => { - const expectedHash = 'Qm12345abc'; - const mockSidetreeContent = { - path: 'path.txt', - cid: Buffer.from(expectedHash), - size: 5493356, - mode: 420, - mtime: undefined - }; - - const mockAdd = (_data: IPFS.FileContent, _options?: any) => { - return new MockAsyncIterable(mockSidetreeContent, mockSidetreeContent); - }; - - spyOn(ipfsStorage['node']!, 'add').and.callFake(mockAdd); - - const bufferContent = Buffer.from('ipfs'); - - const fetchedHash = await ipfsStorage.write(bufferContent); - expect(expectedHash).toEqual(fetchedHash!); - }); - - it('should set healthy to false and return undefined if an error is thrown', async () => { - spyOn(ipfsStorage['node']!, 'add').and.throwError('test error'); - - const bufferContent = Buffer.from('ipfs'); - - const fetchedHash = await ipfsStorage.write(bufferContent); - expect(fetchedHash).toEqual(undefined); - expect(ipfsStorage['healthy']).toEqual(false); - }); - }); - - describe('stop', () => { - it('should call node stop', async () => { - const stopSpy = spyOn(ipfsStorage['node']!, 'stop').and.returnValue(Promise.resolve(undefined)); - await ipfsStorage.stop(); - expect(stopSpy).toHaveBeenCalledTimes(1); - }); - }); - - describe('getNode', () => { - it('should get node with default repo if not argument supplied', async () => { - const result = await ipfsStorage['getNode'](); - expect(createSpy).toHaveBeenCalledWith({ repo: 'sidetree-ipfs' }); - expect(result).toEqual(nodeMock); - }); - - it('should get node with passed in repo if argument supplied', async () => { - const repoHolder = ipfsStorage['repo']; - ipfsStorage['repo'] = 'something'; - const result = await ipfsStorage['getNode'](); - expect(createSpy).toHaveBeenCalledWith({ repo: 'something' }); - expect(result).toEqual(nodeMock); - ipfsStorage['repo'] = repoHolder; - }); - }); - - describe('restart', () => { - it('should restart the ipfs node and continue to be functional', async () => { - const stopSpy = spyOn(ipfsStorage['node']!, 'stop'); - const getNodeSpy = spyOn(ipfsStorage as any, 'getNode'); - await ipfsStorage['restart'](); - expect(stopSpy).toHaveBeenCalled(); - expect(getNodeSpy).toHaveBeenCalled(); - }); - }); - - describe('healthCheck', () => { - it('should not restart if is healthy', async () => { - ipfsStorage['healthy'] = true; - const restartSpy = spyOn(ipfsStorage as any, 'restart'); - await ipfsStorage['healthCheck'](); - expect(restartSpy).toHaveBeenCalledTimes(0); - }); - - it('should call restart if is not healthy', async () => { - const restartSpy = spyOn(ipfsStorage as any, 'restart').and.callFake(async () => { console.log('fake message from test'); }); - ipfsStorage['healthCheckInternalInSeconds'] = 0; - ipfsStorage['healthy'] = false; - await ipfsStorage['healthCheck'](); - expect(restartSpy).toHaveBeenCalledTimes(1); - expect(ipfsStorage['healthy']).toEqual(true); - }); - - it('should go into the next loop even if an error is thrown', async () => { - const restartSpy = spyOn(ipfsStorage as any, 'restart').and.throwError('error thrown by test'); - ipfsStorage['healthCheckInternalInSeconds'] = 0; - ipfsStorage['healthy'] = false; - await ipfsStorage['healthCheck'](); - expect(restartSpy).toHaveBeenCalled(); - expect(ipfsStorage['healthy']).toEqual(false); - }); - }); -}); diff --git a/tests/ipfs/RequestHandler.spec.ts b/tests/ipfs/RequestHandler.spec.ts deleted file mode 100644 index 35edd1a0b..000000000 --- a/tests/ipfs/RequestHandler.spec.ts +++ /dev/null @@ -1,77 +0,0 @@ -import FetchResultCode from '../../lib/common/enums/FetchResultCode'; -import ResponseModel from '../../lib/common/models/ResponseModel'; -import RequestHandler from '../../lib/ipfs/RequestHandler'; -import ResponseStatus from '../../lib/common/enums/ResponseStatus'; -import ServiceVersionModel from '../../lib/common/models/ServiceVersionModel'; - -describe('RequestHandler', async () => { - let maxFileSize: number; - let fetchTimeoutInSeconds: number; - let requestHandler: RequestHandler; - - beforeAll(async (done) => { - maxFileSize = 20000000; // 20MB - fetchTimeoutInSeconds = 1; - requestHandler = RequestHandler.create(fetchTimeoutInSeconds); - done(); - }); - - it('should return the correct response object for invalid multihash for fetch request.', async () => { - const expectedResponse: ResponseModel = { - status: ResponseStatus.BadRequest, - body: { code: FetchResultCode.InvalidHash } - }; - - const testSidetreeHash: string = '123abc'; - const fetchedResponse = await requestHandler.handleFetchRequest(testSidetreeHash, maxFileSize); - - expect(expectedResponse).toEqual(fetchedResponse); - }); - - it('should return the correct response body with content for fetch request', async () => { - const expectedResponse: ResponseModel = { - status: ResponseStatus.Succeeded, - body: Buffer.from('dummyContent') - }; - const testSidetreeHash: string = 'EiCcvAfD-ZFyWDajqipYHKICkZiqQgudmbwOEx2fPiy-Rw'; - spyOn(requestHandler.ipfsStorage, 'read').and.returnValue(Promise.resolve({ code: FetchResultCode.Success, content: Buffer.from('dummyContent') })); - - const fetchedResponse = await requestHandler.handleFetchRequest(testSidetreeHash, maxFileSize); - - expect(expectedResponse).toEqual(fetchedResponse); - }); - - it('should return the correct response body with content for write request', async () => { - const expectedResponse: ResponseModel = { - status: ResponseStatus.Succeeded, - body: { hash: 'EiCcvAfD-ZFyWDajqipYHKICkZiqQgudmbwOEx2fPiy-Rw' } - }; - - // Mock the IPFS storage layer to return a Base58 encoded multihash regardless of content written. - spyOn(requestHandler.ipfsStorage, 'write').and.returnValue(Promise.resolve('QmYtUc4iTCbbfVSDNKvtQqrfyezPPnFvE33wFmutw9PBBk')); - const mockSidetreeContent: Buffer = Buffer.from('dummyContent'); - - const fetchedResponse = await requestHandler.handleWriteRequest(mockSidetreeContent); - - expect(expectedResponse).toEqual(fetchedResponse); - }); - - it('should return the correct response body for the version request', async () => { - const expectedVersion: ServiceVersionModel = { - name: 'test-service', - version: 'x.y.z' - }; - - const expectedResponse = { - status: ResponseStatus.Succeeded, - body: JSON.stringify(expectedVersion) - }; - - // Make the handle service version call return the test value - spyOn(requestHandler['serviceInfo'], 'getServiceVersion').and.returnValue(expectedVersion); - - const fetchedResponse = await requestHandler.handleGetVersionRequest(); - - expect(fetchedResponse).toEqual(expectedResponse); - }); -}); diff --git a/tests/ipfs/Timeout.spec.ts b/tests/ipfs/Timeout.spec.ts new file mode 100644 index 000000000..9d49afafd --- /dev/null +++ b/tests/ipfs/Timeout.spec.ts @@ -0,0 +1,33 @@ +import IpfsErrorCode from '../../lib/ipfs/IpfsErrorCode'; +import Timeout from '../../lib/ipfs/Util/Timeout'; +import JasmineSidetreeErrorValidator from '../JasmineSidetreeErrorValidator'; + +describe('Timeout', async () => { + describe('timeout()', async () => { + it('should timeout if given task took too long.', async (done) => { + // A 10 second running promise. + const longRunningPromise = new Promise((resolve, _reject) => { + setTimeout( + () => { resolve(1); }, + 10); + }); + + await JasmineSidetreeErrorValidator.expectSidetreeErrorToBeThrownAsync( + () => Timeout.timeout(longRunningPromise, 1), + IpfsErrorCode.TimeoutPromiseTimedOut + ); + + done(); + }); + + it('should return error thrown by the task.', async (done) => { + const error = new Error('some bad error'); + const aPromiseThatThrowsError = new Promise((_resolve, _reject) => { + throw error; + }); + + await expectAsync(Timeout.timeout(aPromiseThatThrowsError, 1000)).toBeRejected(error); + done(); + }); + }); +}); diff --git a/tests/json/config-test.json b/tests/json/config-test.json index 7e5950df9..808f79093 100644 --- a/tests/json/config-test.json +++ b/tests/json/config-test.json @@ -1,7 +1,6 @@ { "port": 3000, "didMethodName": "sidetree", - "contentAddressableStoreServiceUri": "http://127.0.0.1:3001/v1.0", "blockchainServiceUri": "http://127.0.0.1:3002", "batchingIntervalInSeconds": 10, "observingIntervalInSeconds": 10,