From d6e78c306c8150c58277d60e51edac55a55523c2 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Fri, 16 Oct 2020 15:26:07 +0200 Subject: [PATCH] fix: Ensure delayed callbacks are always invoked. --- lib/autoPipelining.ts | 14 ++++++++------ lib/cluster/index.ts | 26 ++++++++++++++------------ lib/pipeline.ts | 9 +++++++-- lib/transaction.ts | 9 +++++++-- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 617d348d..45e7b64f 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -88,12 +88,14 @@ export function executeWithAutoPipelining( // On cluster mode let's wait for slots to be available if (client.isCluster && !client.slots.length) { return new CustomPromise(function (resolve, reject) { - client.delayUntilReady(() => { - executeWithAutoPipelining(client, commandName, args, callback).then( - resolve, - reject - ); - }); + client.delayUntilReady(err => { + if (err) { + reject(err); + return; + } + + executeWithAutoPipelining(client, commandName, args, callback).then(resolve, reject); + }) }); } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 7db4e831..6796b714 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -227,6 +227,7 @@ class Cluster extends EventEmitter { let closeListener: () => void = undefined; const refreshListener = () => { + this.invokeReadyDelayedCallbacks(undefined); this.removeListener("close", closeListener); this.manuallyClosing = false; this.setStatus("connect"); @@ -250,8 +251,11 @@ class Cluster extends EventEmitter { }; closeListener = function () { + const error = new Error("None of startup nodes is available"); + this.removeListener("refresh", refreshListener); - reject(new Error("None of startup nodes is available")); + this.invokeReadyDelayedCallbacks(error); + reject(error); }; this.once("refresh", refreshListener); @@ -271,6 +275,7 @@ class Cluster extends EventEmitter { .catch((err) => { this.setStatus("close"); this.handleCloseEvent(err); + this.invokeReadyDelayedCallbacks(err); reject(err); }); }); @@ -440,17 +445,6 @@ class Cluster extends EventEmitter { // This is needed in order not to install a listener for each auto pipeline public delayUntilReady(callback: CallbackFunction) { - // First call, setup the event listener - if (!this._readyDelayedCallbacks.length) { - this.once("ready", (...args) => { - for (const c of this._readyDelayedCallbacks) { - c(...args); - } - - this._readyDelayedCallbacks = []; - }); - } - this._readyDelayedCallbacks.push(callback); } @@ -850,6 +844,14 @@ class Cluster extends EventEmitter { ); } + private invokeReadyDelayedCallbacks(err) { + for (const c of this._readyDelayedCallbacks) { + process.nextTick(c, err); + } + + this._readyDelayedCallbacks = []; + } + /** * Check whether Cluster is able to process commands * diff --git a/lib/pipeline.ts b/lib/pipeline.ts index 3e3e8d6c..194ee5a4 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -236,9 +236,14 @@ Pipeline.prototype.execBuffer = deprecate(function () { Pipeline.prototype.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { - this.redis.delayUntilReady(() => { + this.redis.delayUntilReady(err => { + if (err) { + callback(err); + return; + } + this.exec(callback); - }); + }) return this.promise; } diff --git a/lib/transaction.ts b/lib/transaction.ts index 6e0952b6..41583cfc 100644 --- a/lib/transaction.ts +++ b/lib/transaction.ts @@ -32,9 +32,14 @@ export function addTransactionSupport(redis) { if (this.isCluster && !this.redis.slots.length) { return asCallback( new Promise((resolve, reject) => { - this.redis.delayUntilReady(() => { + this.redis.delayUntilReady(err => { + if (err) { + reject(err); + return; + } + this.exec(pipeline).then(resolve, reject); - }); + }) }), callback );