diff --git a/lib/circuit.js b/lib/circuit.js index be12441d..62b51432 100644 --- a/lib/circuit.js +++ b/lib/circuit.js @@ -10,6 +10,7 @@ const OPEN = Symbol('open'); const CLOSED = Symbol('closed'); const HALF_OPEN = Symbol('half-open'); const PENDING_CLOSE = Symbol('pending-close'); +const SHUTDOWN = Symbol('shutdown'); const FALLBACK_FUNCTION = Symbol('fallback'); const STATUS = Symbol('status'); const NAME = Symbol('name'); @@ -187,6 +188,25 @@ class CircuitBreaker extends EventEmitter { } } + /** + * Shuts down this circuit breaker. All subsequent calls to the + * circuit will fail, returning a rejected promise. + */ + shutdown () { + this.disable(); + this.removeAllListeners(); + this.status.shutdown(); + this.hystrixStats.shutdown(); + this[STATE] = SHUTDOWN; + } + + /** + * Determines if the circuit has been shutdown. + */ + get isShutdown () { + return this[STATE] === SHUTDOWN; + } + /** * Gets the name of this circuit */ @@ -239,7 +259,9 @@ class CircuitBreaker extends EventEmitter { } /** - A convenience function that returns the hystrixStats + A convenience function that returns the hystrixStats. If the circuit has + been shutdown, returns undefined and any existing external references to + the HystrixStats instance will stop emitting data. */ get hystrixStats () { return this[HYSTRIX_STATS]; @@ -295,6 +317,11 @@ class CircuitBreaker extends EventEmitter { * @fires CircuitBreaker#semaphore-locked */ fire () { + if (this.isShutdown) { + const err = new Error('The circuit has been shutdown.'); + err.code = 'ESHUTDOWN'; + return Promise.reject(err); + } const args = Array.prototype.slice.call(arguments); /** diff --git a/lib/hystrix-stats.js b/lib/hystrix-stats.js index 977d6616..da92db66 100644 --- a/lib/hystrix-stats.js +++ b/lib/hystrix-stats.js @@ -11,12 +11,6 @@ const hystrixStream = new Transform({ } }); -// TODO: This number is somewhat arbitrary. However, we need to allow -// a potentially large number of listeners on this transform stream -// because all circuits are connected to it. In an application with -// a large number of circuits (or tests using circuits), if left to -// the default MAX_LISTENERS, the user will see errors. -hystrixStream.setMaxListeners(100); hystrixStream.resume(); /** @@ -41,7 +35,7 @@ hystrixStream.resume(); */ class HystrixStats { constructor (circuit) { - const _readableStream = new Readable({ + this._readableStream = new Readable({ objectMode: true, read () {} }); @@ -49,7 +43,7 @@ class HystrixStats { // Listen for the stats's snapshot event circuit.status.on('snapshot', function snapshotListener (stats) { // when we get a snapshot push it onto the stream - _readableStream.push( + this._readableStream.push( Object.assign({}, { name: circuit.name, @@ -57,18 +51,27 @@ class HystrixStats { group: circuit.group, options: circuit.options }, stats)); - }); + }.bind(this)); - _readableStream.resume(); - _readableStream.pipe(hystrixStream); + this._readableStream.resume(); + this._readableStream.pipe(hystrixStream); } /** - A convenience function that returns the hystrxStream + A convenience function that returns the hystrixStream */ getHystrixStream () { return hystrixStream; } + + /** + * Shuts down this instance, freeing memory. + * When a circuit is shutdown, it should call shutdown() on + * its HystrixStats instance to avoid memory leaks. + */ + shutdown () { + this._readableStream.unpipe(hystrixStream); + } } HystrixStats.stream = hystrixStream; diff --git a/lib/status.js b/lib/status.js index 7c6c991d..6d6b790d 100644 --- a/lib/status.js +++ b/lib/status.js @@ -153,6 +153,10 @@ class Status extends EventEmitter { close () { this[WINDOW][0].isCircuitBreakerOpen = false; } + + shutdown () { + this.removeAllListeners(); + } } const nextBucket = window => _ => { diff --git a/package.json b/package.json index be7ea6cb..82d2657b 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "test:headless": "node test/browser/webpack-test.js", "test:browser": "opener http://localhost:9007/test/browser/index.html && http-server . -p 9007", "test:coverage": "nyc tape test/*.js | tap-spec", - "prepare": "nsp check && npm run build:browser", + "prepare": "npm audit && npm run build:browser", "postpublish": "./publish-docs.sh", "prerelease": "npm test", "release": "standard-version -s", @@ -42,7 +42,6 @@ "jsdoc": "3.5.5", "marked": "~0.5.0", "moment": "~2.22.0", - "nsp": "~3.2.1", "nyc": "~12.0.2", "opener": "1.5.1", "semistandard": "~12.0.1", diff --git a/test/circuit-shutdown-test.js b/test/circuit-shutdown-test.js new file mode 100644 index 00000000..613dc0da --- /dev/null +++ b/test/circuit-shutdown-test.js @@ -0,0 +1,27 @@ +'use strict'; + +const test = require('tape'); +const circuit = require('..'); +const passFail = require('./common').passFail; + +// tests that we are not leaving listeners open to +// chew up memory +test('EventEmitter max listeners', t => { + let i = 100; + while (--i >= 0) { + const breaker = circuit(passFail); + breaker.fire(1); + breaker.shutdown(); // required for cleanup + } + t.end(); +}); + +test('Circuit shuts down properly', t => { + t.plan(3); + const breaker = circuit(passFail); + t.ok(breaker.fire(1), 'breaker is active'); + breaker.shutdown(); + t.ok(breaker.isShutdown, 'breaker is shutdown'); + t.notOk(breaker.enabled, 'breaker has been disabled'); + t.end(); +}); diff --git a/test/enable-disable-test.js b/test/enable-disable-test.js index 780a15e3..c0751608 100644 --- a/test/enable-disable-test.js +++ b/test/enable-disable-test.js @@ -8,6 +8,7 @@ test('Defaults to enabled', t => { t.plan(1); const breaker = opossum(passFail); t.equals(breaker.enabled, true); + breaker.shutdown(); t.end(); }); @@ -15,6 +16,7 @@ test('Accepts options.enabled', t => { t.plan(1); const breaker = opossum(passFail, { enabled: false }); t.equals(breaker.enabled, false); + breaker.shutdown(); t.end(); }); @@ -50,6 +52,8 @@ test('When disabled the circuit should always be closed', t => { .catch(e => t.equals(e, 'Error: -1 is < 0')) .then(() => { t.ok(breaker.opened, 'should be closed'); - }); + }) + .then(_ => breaker.shutdown()) + .then(t.end); }); }); diff --git a/test/half-open-test.js b/test/half-open-test.js index 209d52b8..668502f7 100644 --- a/test/half-open-test.js +++ b/test/half-open-test.js @@ -34,10 +34,14 @@ test('When half-open, the circuit only allows one request through', t => { 'should not be halfOpen after long failing function'); t.notOk(breaker.pendingClose, 'should not be pending close after long failing func'); - }); - // fire the breaker again, and be sure it fails as expected - breaker - .fire(1) - .catch(e => t.equals(e.message, 'Breaker is open')); + }) + .then(_ => { + // fire the breaker again, and be sure it fails as expected + breaker + .fire(1) + .catch(e => t.equals(e.message, 'Breaker is open')); + }) + .then(_ => breaker.shutdown()) + .then(t.end); }, options.resetTimeout * 1.5); }); diff --git a/test/health-check-test.js b/test/health-check-test.js index 912100ba..bc3d5a48 100644 --- a/test/health-check-test.js +++ b/test/health-check-test.js @@ -9,6 +9,7 @@ test('Circuits accept a health check function', t => { const circuit = opossum(common.passFail); circuit.healthCheck(healthChecker(_ => { t.ok(true, 'function called'); + circuit.shutdown(); t.end(); return Promise.resolve(); }), 10000); @@ -19,6 +20,7 @@ test('health-check-failed is emitted on failure', t => { const circuit = opossum(common.passFail); circuit.on('health-check-failed', e => { t.equals(e.message, 'Too many tacos', 'health-check-failed emitted'); + circuit.shutdown(); t.end(); }); circuit.healthCheck( @@ -30,6 +32,7 @@ test('circuit opens on health check failure', t => { const circuit = opossum(common.passFail); circuit.on('open', e => { t.ok(circuit.opened, 'circuit opened'); + circuit.shutdown(); t.end(); }); circuit.healthCheck( @@ -43,6 +46,7 @@ test('Health check function executes in the circuit breaker context', t => { circuit.healthCheck(function healthCheck () { if (!called) { t.equal(this, circuit, 'health check executes in circuit context'); + circuit.shutdown(); t.end(); } called = true; @@ -60,6 +64,7 @@ test('healthCheck() throws TypeError if interval duration is NaN', t => { t.equals(e.constructor, TypeError, 'throws TypeError'); t.equals(e.message, 'Health check interval must be a number', 'include correct message'); + circuit.shutdown(); t.end(); } }); @@ -74,6 +79,7 @@ test('healthCheck() throws TypeError if parameter is not a function', t => { t.equals(e.constructor, TypeError, 'throws TypeError'); t.equals(e.message, 'Health check function must be a function', 'include correct message'); + circuit.shutdown(); t.end(); } }); diff --git a/test/hystrix-test.js b/test/hystrix-test.js index 444354e6..c2a6b381 100644 --- a/test/hystrix-test.js +++ b/test/hystrix-test.js @@ -18,7 +18,7 @@ test('A circuit should provide stats to a hystrix compatible stream', t => { }); const stream = circuitOne.hystrixStats.getHystrixStream(); let circuitOneStatsSeen = false; - let circuitTwoStatsSeen = true; + let circuitTwoStatsSeen = false; stream.on('data', blob => { const obj = JSON.parse(blob.substring(6)); if (obj.name === 'circuit one') circuitOneStatsSeen = true; diff --git a/test/test.js b/test/test.js index 7947081c..9f4a0540 100644 --- a/test/test.js +++ b/test/test.js @@ -18,18 +18,21 @@ test('api', t => { t.ok(breaker.options, 'CircuitBreaker.options'); t.ok(breaker.hystrixStats, 'CircuitBreaker.hystrixStats'); t.equals(breaker.action, passFail, 'CircuitBreaker.action'); + breaker.shutdown(); t.end(); }); test('has a name based on the function name', t => { const breaker = circuit(passFail); t.equals(breaker.name, passFail.name); + breaker.shutdown(); t.end(); }); test('accepts a name as an option', t => { const breaker = circuit(passFail, {name: 'tacoMachine'}); t.equals(breaker.name, 'tacoMachine'); + breaker.shutdown(); t.end(); }); @@ -37,18 +40,21 @@ test('uses UUID as a name when none is provided and the function is anonymoys', t => { const breaker = circuit(identity); t.ok(breaker.name); + breaker.shutdown(); t.end(); }); test('accepts a group as an option', t => { const breaker = circuit(passFail, {group: 'tacoMachine'}); t.equals(breaker.group, 'tacoMachine'); + breaker.shutdown(); t.end(); }); test('uses name as a group when no group is provided', t => { const breaker = circuit(passFail, {name: 'tacoMachine'}); t.equals(breaker.group, 'tacoMachine'); + breaker.shutdown(); t.end(); }); @@ -59,6 +65,7 @@ test('Passes parameters to the circuit function', t => { breaker.fire(expected) .then(arg => t.equals(arg, expected, 'function parameters provided')) + .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }); @@ -100,6 +107,7 @@ test('Using cache', t => { t.equals(arg, expected, `cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`); }) + // .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }); @@ -115,6 +123,7 @@ test('Fails when the circuit function fails', t => { .catch(e => { t.equals(e, 'Error: -1 is < 0', 'expected error caught'); }) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -130,6 +139,7 @@ test('Fails when the circuit function times out', t => { t.equals(e.message, expected, 'timeout message received'); t.equals(e.code, expectedCode, 'ETIMEDOUT'); }) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -139,6 +149,7 @@ test('Works with functions that do not return a promise', t => { breaker.fire() .then(arg => t.equals(arg, 'foo', 'non-Promise returns expected value')) + .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }); @@ -149,6 +160,7 @@ test('Works with non-functions', t => { breaker.fire() .then(arg => t.equals(arg, 'foobar', 'expected raw value returns')) + .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }); @@ -160,6 +172,7 @@ test('Works with callback functions', t => { breaker.fire(3, 4) .then(arg => t.equals(arg, 7, 'CircuitBreaker.promisify works')) + .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }); @@ -172,6 +185,7 @@ test('Works with callback functions that fail', t => { breaker.fire(3, 4) .then(t.fail) .catch(e => t.equals(e, 'Whoops!', 'caught expected error')) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -188,6 +202,7 @@ test('Breaker opens after a configurable number of failures', t => { breaker.fire(100) .then(t.fail) .catch(e => t.equals(e.message, 'Breaker is open', 'breaker opens')) + .then(_ => breaker.shutdown()) .then(t.end); }) .catch(t.fail); @@ -206,6 +221,7 @@ test('Breaker resets after a configurable amount of time', t => { setTimeout(() => { breaker.fire(100) .then(arg => t.equals(arg, 100, 'breaker has reset')) + .then(_ => breaker.shutdown()) .then(t.end); }, resetTimeout * 1.25); }); @@ -218,6 +234,7 @@ test('Breaker status reflects open state', t => { breaker.fire(-1) .then(t.fail) .catch(() => t.ok(breaker.status.window[0].isCircuitBreakerOpen)) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -237,6 +254,7 @@ test('Breaker resets for circuits with a fallback function', t => { .then(arg => { t.equals(arg, 100, 'breaker has reset'); }) + .then(_ => breaker.shutdown()) .then(t.end) .catch(t.fail); }, resetTimeout * 1.25); @@ -256,6 +274,7 @@ test('Executes fallback action, if one exists, when breaker is open', t => { // Now the breaker should be open. See if fallback fires. breaker.fire() .then(x => t.equals(x, expected, 'fallback action executes')) + .then(_ => breaker.shutdown()) .then(t.end); }); }); @@ -266,6 +285,7 @@ test('Passes error as last argument to the fallback function', t => { const breaker = circuit(passFail, { errorThresholdPercentage: 1 }); breaker.on('fallback', result => { t.equals(result, `Error: ${fails} is < 0`, 'fallback received error as last parameter'); + breaker.shutdown(); t.end(); }); breaker.fallback((x, e) => e); @@ -284,7 +304,8 @@ test('Fallback is not called twice for the same execution when action fails afte }); breaker.fire(actionDuration) - .catch((err) => noop(err)); + .catch((err) => noop(err)) + .then(_ => breaker.shutdown()); // keep this test alive until action finishes setTimeout(() => noop, actionDuration); @@ -296,7 +317,7 @@ test('Passes arguments to the fallback function', t => { const breaker = circuit(passFail, { errorThresholdPercentage: 1 }); breaker.on('fallback', result => { t.equals(result, fails, 'fallback received expected parameters'); - t.end(); + breaker.shutdown(); }); breaker.fallback(x => x); breaker.fire(fails).catch(t.fail); @@ -304,11 +325,12 @@ test('Passes arguments to the fallback function', t => { test('Returns self from fallback()', t => { t.plan(1); - circuit(passFail, { errorThresholdPercentage: 1 }) - .fallback(noop) - .fire(1) + const breaker = circuit(passFail, { errorThresholdPercentage: 1 }); + breaker.fallback(noop); + breaker.fire(1) .then(result => { t.equals(result, 1, 'instance returned from fallback'); + breaker.shutdown(); }) .then(t.end) .catch(t.fail); @@ -322,6 +344,7 @@ test('CircuitBreaker emits failure when action throws', t => { .catch(e => { t.equals(breaker.status.stats.failures, 1, 'expected failure status'); t.equals(e.message, 'E_TOOMANYCHICKENTACOS', 'expected error message'); + breaker.shutdown(); t.end(); }); }); @@ -337,6 +360,8 @@ test('CircuitBreaker executes fallback when an action throws', t => { t.equals(stats.fallbacks, 1, 'expected fallback status'); t.equals(result, 'Fallback executed'); }) + .then(_ => breaker.shutdown()) + .then(t.end) .catch(t.fail); }); @@ -350,7 +375,10 @@ test('CircuitBreaker emits failure when falling back', t => { breaker.fire(-1).then(result => { t.equals('fallback value', result, 'fallback value is correct'); - }).catch(t.fail); + }) + .then(_ => breaker.shutdown()) + .then(t.end) + .catch(t.fail); }); test('CircuitBreaker status', t => { @@ -390,6 +418,7 @@ test('CircuitBreaker status', t => { }) .catch(t.fail); }) + .then(_ => breaker.shutdown()) .catch(t.fail) .then(t.end); }); @@ -414,6 +443,7 @@ test('CircuitBreaker rolling counts', t => { const window = breaker.status.window; t.ok(window.length > 1); t.deepEqual(window[0].successes, 0, 'breaker reset stats'); + breaker.shutdown(); t.end(); }, 300); }); @@ -433,7 +463,9 @@ test('CircuitBreaker status listeners', t => { breaker.status.removeAllListeners('snapshot'); }); - breaker.fire(10).then(_ => t.end()); + breaker.fire(10) + .then(_ => breaker.shutdown()) + .then(t.end); }); test('CircuitBreaker fallback event', t => { @@ -442,6 +474,7 @@ test('CircuitBreaker fallback event', t => { breaker.fallback(x => x); breaker.on('fallback', value => { t.equal(value, -1, 'fallback value received'); + breaker.shutdown(); t.end(); }); breaker.fire(-1); @@ -545,6 +578,7 @@ test('CircuitBreaker events', t => { timeoutBreaker.fire().then(t.fail).catch(noop); }) .then(e => t.equals(timeout, 0, 'timeout event fired')) + .then(_ => breaker.shutdown()) .then(t.end); })); }); @@ -593,6 +627,7 @@ test('circuit halfOpen', t => { t.ok(breaker.closed, 'breaker should be closed'); t.notOk(breaker.pendingClose, 'breaker should not be pending close'); + breaker.shutdown(); t.end(); }) .catch(t.fail); @@ -614,7 +649,7 @@ test('CircuitBreaker fallback as a rejected promise', t => { breaker.fire(input).then(t.fail).catch(e => { t.equals('nope', e.message); - }).then(t.end); + }).then(_ => breaker.shutdown()).then(t.end); }); test('CircuitBreaker fallback event as a rejected promise', t => { @@ -631,6 +666,7 @@ test('CircuitBreaker fallback event as a rejected promise', t => { result .then(t.fail) .catch(e => t.equals('nope', e.message)) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -650,6 +686,7 @@ test('CircuitBreaker fallback as a CircuitBreaker', t => { breaker.fire(input) .then(v => t.equals(v, input, 'Fallback value equals input')) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -666,6 +703,7 @@ test('CircuitBreaker fallback as a CircuitBreaker that fails', t => { breaker.fire(input) .catch(e => t.equals(e, 'Error: -1 is < 0', 'Breaker should fail')) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -682,6 +720,7 @@ test('CircuitBreaker fallback as a CircuitBreaker', t => { breaker.fire(input) .then(v => t.equals(v, input, 'Fallback value equals input')) + .then(_ => breaker.shutdown()) .then(t.end); }); @@ -709,6 +748,7 @@ test('rolling percentile enabled option defaults to true', t => { t.equals(breaker.status.stats.percentiles[p], 0, `${p} percentile should be 0 at the start`); }); + breaker.shutdown(); t.end(); }); @@ -723,6 +763,7 @@ test('rolling percentile enabled option set to false', t => { t.equals(breaker.status.stats.percentiles[p], -1, `${p} percentile should be -1 when rollingPercentilesEnabled is false`); }); + breaker.shutdown(); t.end(); }); @@ -731,6 +772,7 @@ test('Circuit Breaker success event emits latency', t => { const breaker = circuit(passFail); breaker.on('success', (result, latencyTime) => { t.ok(latencyTime, 'second argument is the latency'); + breaker.shutdown(); t.end(); }); @@ -742,6 +784,7 @@ test('Circuit Breaker failure event emits latency', t => { const breaker = circuit(passFail); breaker.on('failure', (result, latencyTime) => { t.ok(latencyTime, 'second argument is the latency'); + breaker.shutdown(); t.end(); }); @@ -754,6 +797,7 @@ test('Circuit Breaker timeout event emits latency', t => { breaker.on('timeout', (result, latencyTime) => { t.ok(latencyTime, 'second argument is the latency'); + breaker.shutdown(); t.end(); }); @@ -766,6 +810,7 @@ test('Circuit Breaker timeout with semaphore released', t => { breaker.on('timeout', result => { t.equal(breaker.semaphore.count, breaker.options.capacity); + breaker.shutdown(); t.end(); }); @@ -783,6 +828,7 @@ test('CircuitBreaker semaphore rate limiting', t => { t.equals(breaker.stats.semaphoreRejections, 1, 'Semaphore rejection status incremented'); t.equals(err.code, 'ESEMLOCKED', 'Semaphore was locked'); + breaker.shutdown(); t.end(); }); }); @@ -790,6 +836,7 @@ test('CircuitBreaker semaphore rate limiting', t => { test('CircuitBreaker default capacity', t => { const breaker = circuit(passFail); t.equals(breaker.options.capacity, Number.MAX_SAFE_INTEGER); + breaker.shutdown(); t.end(); }); diff --git a/test/volume-threshold-test.js b/test/volume-threshold-test.js index 119c8ff7..2087379b 100644 --- a/test/volume-threshold-test.js +++ b/test/volume-threshold-test.js @@ -18,7 +18,9 @@ test('By default does not have a volume threshold', t => { t.ok(breaker.opened, 'should be open after initial fire'); t.notOk(breaker.pendingClose, 'should not be pending close after initial fire'); - }); + }) + .then(_ => breaker.shutdown()) + .then(t.end); }); test('Has a volume threshold before tripping when option is provided', t => { @@ -54,6 +56,7 @@ test('Has a volume threshold before tripping when option is provided', t => { t.ok(breaker.opened, 'should be open after volume threshold has been reached'); }) + .then(breaker.shutdown()) .then(t.end); }); }); diff --git a/test/warmup-test.js b/test/warmup-test.js index 55dcbc84..66906967 100644 --- a/test/warmup-test.js +++ b/test/warmup-test.js @@ -18,7 +18,9 @@ test('By default does not allow for warmup', t => { t.ok(breaker.opened, 'should be open after initial fire'); t.notOk(breaker.pendingClose, 'should not be pending close after initial fire'); - }); + }) + .then(_ => breaker.shutdown()) + .then(t.end); }); test('Allows for warmup when option is provided', t => { @@ -36,7 +38,9 @@ test('Allows for warmup when option is provided', t => { t.notOk(breaker.opened, 'should not be open after initial fire'); t.notOk(breaker.pendingClose, 'should not be pending close after initial fire'); - }); + }) + .then(_ => breaker.shutdown()) + .then(t.end); }); test('Only warms up for rollingCountTimeout', t => { @@ -59,6 +63,8 @@ test('Only warms up for rollingCountTimeout', t => { .then(() => { setTimeout(_ => { t.notOk(breaker.warmUp, 'Warmup should end after rollingCountTimeout'); + breaker.shutdown(); + t.end(); }, 500); }); });