From 33fb92a90633395314d5f417bdd027fd32bfd461 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 31 Oct 2024 18:39:21 +0200 Subject: [PATCH] CSC POC ontop of Parser --- packages/client/index.ts | 3 + packages/client/lib/client/commands-queue.ts | 21 ++++++- packages/client/lib/client/index.ts | 62 +++++++++++++++---- packages/client/lib/client/linked-list.ts | 10 ++- packages/client/lib/client/parser.ts | 5 ++ packages/client/lib/client/pool.ts | 61 +++++++++++++++--- packages/client/lib/client/socket.ts | 7 +++ packages/client/lib/cluster/cluster-slots.ts | 18 +++++- packages/client/lib/cluster/index.ts | 49 +++++++++++---- packages/client/lib/commands/GEOSEARCH.ts | 5 -- .../client/lib/commands/GEOSEARCHSTORE.ts | 7 ++- packages/client/lib/sentinel/index.spec.ts | 33 +++++++++- packages/client/lib/sentinel/index.ts | 25 +++++++- packages/client/lib/sentinel/test-util.ts | 14 +++-- packages/client/lib/sentinel/types.ts | 5 ++ packages/client/lib/sentinel/utils.ts | 2 +- 16 files changed, 273 insertions(+), 54 deletions(-) diff --git a/packages/client/index.ts b/packages/client/index.ts index 56cdf703ca..d4dceb357f 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel'; export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types'; export const createSentinel = RedisSentinel.create; +import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache'; +export { BasicClientSideCache, BasicPooledClientSideCache }; + // export { GeoReplyWith } from './lib/commands/generic-transformers'; // export { SetOptions } from './lib/commands/SET'; diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15e8a747b9..0199aa92c1 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: RedisArgument | null) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -109,13 +111,30 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - + switch (push[0].toString()) { + case "invalidate": { + if (this.#invalidateCallback) { + if (push[1] !== null) { + for (const key of push[1]) { + this.#invalidateCallback(key); + } + } else { + this.#invalidateCallback(null); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: ReadonlyArray, options?: CommandOptions diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 55355a133d..c256f3a1a3 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,6 +15,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers'; +import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache'; import { BasicCommandParser, CommandParser } from './parser'; export interface RedisClientOptions< @@ -72,6 +73,10 @@ export interface RedisClientOptions< * TODO */ commandOptions?: CommandOptions; + /** + * TODO + */ + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } type WithCommands< @@ -280,10 +285,12 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; + #dirtyWatch?: string; - #epoch: number; #watchEpoch?: number; + #clientSideCache?: ClientSideCacheProvider; + get options(): RedisClientOptions | undefined { return this._self.#options; } @@ -300,6 +307,11 @@ export default class RedisClient< return this._self.#queue.isPubSubActive; } + get socketEpoch() { + return this._self.#socket.socketEpoch; + } + + get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -310,10 +322,20 @@ export default class RedisClient< constructor(options?: RedisClientOptions) { super(); + this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#epoch = 0; + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof ClientSideCacheProvider) { + this.#clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = new BasicClientSideCache(cscConfig); + } + this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + } } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { @@ -347,7 +369,6 @@ export default class RedisClient< #handshake(selectedDB: number) { const commands = []; - if (this.#options?.RESP) { const hello: HelloOptions = {}; @@ -392,6 +413,13 @@ export default class RedisClient< ); } + if (this.#clientSideCache) { + const tracking = this.#clientSideCache.trackingOn(); + if (tracking) { + commands.push(tracking); + } + } + return commands; } @@ -445,6 +473,7 @@ export default class RedisClient< }) .on('error', err => { this.emit('error', err); + this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { @@ -453,7 +482,6 @@ export default class RedisClient< }) .on('connect', () => this.emit('connect')) .on('ready', () => { - this.#epoch++; this.emit('ready'); this.#setPingTimer(); this.#maybeScheduleWrite(); @@ -581,13 +609,21 @@ export default class RedisClient< commandOptions: CommandOptions | undefined, transformReply: TransformReply | undefined, ) { - const reply = await this.sendCommand(parser.redisArgs, commandOptions); + const csc = this._self.#clientSideCache; + const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; - if (transformReply) { - return transformReply(reply, parser.preserve, commandOptions?.typeMapping); - } + const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) }; + + if (csc && command.CACHEABLE && defaultTypeMapping) { + return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping); + } else { + const reply = await fn(); - return reply; + if (transformReply) { + return transformReply(reply, parser.preserve, commandOptions?.typeMapping); + } + return reply; + } } /** @@ -752,7 +788,7 @@ export default class RedisClient< const reply = await this._self.sendCommand( pushVariadicArguments(['WATCH'], key) ); - this._self.#watchEpoch ??= this._self.#epoch; + this._self.#watchEpoch ??= this._self.socketEpoch; return reply as unknown as ReplyWithTypeMapping, TYPE_MAPPING>; } @@ -819,7 +855,7 @@ export default class RedisClient< } const chainId = Symbol('Pipeline Chain'), - promise = Promise.all( + promise = Promise.allSettled( commands.map(({ args }) => this._self.#queue.addCommand(args, { chainId, typeMapping: this._commandOptions?.typeMapping @@ -855,7 +891,7 @@ export default class RedisClient< throw new WatchError(dirtyWatch); } - if (watchEpoch && watchEpoch !== this._self.#epoch) { + if (watchEpoch && watchEpoch !== this._self.socketEpoch) { throw new WatchError('Client reconnected after WATCH'); } @@ -1075,6 +1111,7 @@ export default class RedisClient< return new Promise(resolve => { clearTimeout(this._self.#pingTimer); this._self.#socket.close(); + this._self.#clientSideCache?.onClose(); if (this._self.#queue.isEmpty()) { this._self.#socket.destroySocket(); @@ -1099,6 +1136,7 @@ export default class RedisClient< clearTimeout(this._self.#pingTimer); this._self.#queue.flushAll(new DisconnectsClientError()); this._self.#socket.destroy(); + this._self.#clientSideCache?.onClose(); } ref() { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index ac1d021be9..29678f027b 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -114,6 +114,7 @@ export class DoublyLinkedList { export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; + removed: boolean; } export class SinglyLinkedList { @@ -140,7 +141,8 @@ export class SinglyLinkedList { const node = { value, - next: undefined + next: undefined, + removed: false }; if (this.#head === undefined) { @@ -151,6 +153,9 @@ export class SinglyLinkedList { } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + if (node.removed) { + throw new Error("node already removed"); + } --this.#length; if (this.#head === node) { @@ -165,6 +170,8 @@ export class SinglyLinkedList { } else { parent!.next = node.next; } + + node.removed = true; } shift() { @@ -177,6 +184,7 @@ export class SinglyLinkedList { this.#head = node.next; } + node.removed = true; return node.value; } diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 12eec45773..76251ea67a 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -33,6 +33,11 @@ export class BasicCommandParser implements CommandParser { return this.#keys[0]; } + get cacheKey() { + let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_'); + return cacheKey + '_' + this.#redisArgs.join('_'); + } + push(...arg: Array) { this.#redisArgs.push(...arg); }; diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index a08377e3d3..400268f4bb 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,6 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache'; import { BasicCommandParser } from './parser'; export interface RedisPoolOptions { @@ -26,6 +27,10 @@ export interface RedisPoolOptions { * TODO */ cleanupDelay: number; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; /** * TODO */ @@ -117,7 +122,7 @@ export class RedisClientPool< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping = {} >( - clientOptions?: RedisClientOptions, + clientOptions?: Omit, "clientSideCache">, options?: Partial ) { const Pool = attachConfig({ @@ -135,7 +140,7 @@ export class RedisClientPool< // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( - RedisClient.factory(clientOptions).bind(undefined, clientOptions), + clientOptions, options ) ) as RedisClientPoolType; @@ -209,22 +214,39 @@ export class RedisClientPool< return this._self.#isClosing; } + #clientSideCache?: PooledClientSideCacheProvider; + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... */ constructor( - clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, options?: Partial ) { super(); - this.#clientFactory = clientFactory; this.#options = { ...RedisClientPool.#DEFAULTS, ...options }; + if (options?.clientSideCache) { + if (clientOptions === undefined) { + clientOptions = {}; + } + + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); + this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); + } + } + + this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType; } private _self = this; @@ -288,9 +310,15 @@ export class RedisClientPool< async connect() { if (this._self.#isOpen) return; // TODO: throw error? - this._self.#isOpen = true; + try { + this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory); + } catch (err) { + this.destroy(); + throw err; + } + const promises = []; while (promises.length < this._self.#options.minimum) { promises.push(this._self.#create()); @@ -298,21 +326,27 @@ export class RedisClientPool< try { await Promise.all(promises); - return this as unknown as RedisClientPoolType; } catch (err) { this.destroy(); throw err; } + + return this as unknown as RedisClientPoolType; } - async #create() { + async #create(redirect?: boolean) { const node = this._self.#clientsInUse.push( this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) ); try { - await node.value.connect(); + const client = node.value; + if (this._self.#clientSideCache) { + this._self.#clientSideCache.addClient(node.value); + } + + await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); throw err; @@ -401,7 +435,9 @@ export class RedisClientPool< const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this.#idleClients.shift()!.destroy(); + const client = this.#idleClients.shift()! + this.#clientSideCache?.removeClient(client); + client.destroy(); } } @@ -439,8 +475,10 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { promises.push(client.close()); } - + await Promise.all(promises); + + this.#clientSideCache?.onPoolClose(); this._self.#idleClients.reset(); this._self.#clientsInUse.reset(); @@ -460,6 +498,9 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { client.destroy(); } + + this._self.#clientSideCache?.onPoolClose(); + this._self.#clientsInUse.reset(); this._self.#isOpen = false; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 36afa36c04..603416cf9e 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -72,6 +72,12 @@ export default class RedisSocket extends EventEmitter { #isSocketUnrefed = false; + #socketEpoch = 0; + + get socketEpoch() { + return this.#socketEpoch; + } + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -212,6 +218,7 @@ export default class RedisSocket extends EventEmitter { throw err; } this.#isReady = true; + this.#socketEpoch++; this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 824cf2ae81..8eef696aeb 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -6,6 +6,7 @@ import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pu import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface NodeAddress { host: string; @@ -111,6 +112,7 @@ export default class RedisClusterSlots< replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; + clientSideCache?: PooledClientSideCacheProvider; #isOpen = false; @@ -123,7 +125,16 @@ export default class RedisClusterSlots< emit: EventEmitter['emit'] ) { this.#options = options; - this.#clientFactory = RedisClient.factory(options); + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.clientSideCache = options.clientSideCache; + } else { + this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) + } + } + + this.#clientFactory = RedisClient.factory(this.#options); this.#emit = emit; } @@ -164,6 +175,8 @@ export default class RedisClusterSlots< } async #discover(rootNode: RedisClusterClientOptions) { + this.clientSideCache?.clear(); + this.clientSideCache?.disable(); this.#resetSlots(); try { const addressesInUse = new Set(), @@ -218,6 +231,7 @@ export default class RedisClusterSlots< } await Promise.all(promises); + this.clientSideCache?.enable(); return true; } catch (err) { @@ -313,6 +327,8 @@ export default class RedisClusterSlots< #createClient(node: ShardNode, readonly = node.readonly) { return this.#clientFactory( this.#clientOptionsDefaults({ + clientSideCache: this.clientSideCache, + RESP: this.#options.RESP, socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 12928e71f1..cfe6719398 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -9,9 +9,9 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi- import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; import { RedisTcpSocketOptions } from '../client/socket'; -import ASKING from '../commands/ASKING'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; import { BasicCommandParser } from '../client/parser'; -import { parseArgs } from '../commands/generic-transformers'; +import { ASKING_CMD } from '../commands/ASKING'; interface ClusterCommander< M extends RedisModules, @@ -66,6 +66,10 @@ export interface RedisClusterOptions< * Useful when the cluster is running on another network */ nodeAddressMap?: NodeAddressMap; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } // remove once request & response policies are ready @@ -148,6 +152,7 @@ export default class RedisCluster< > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: ProxyCluster, ...args: Array) { const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); @@ -382,6 +387,27 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } + #handleAsk( + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise + ) { + return async (client: RedisClientType, options?: ClusterCommandOptions) => { + const chainId = Symbol("asking chain"); + const opts = options ? {...options} : {}; + opts.chainId = chainId; + + + + const ret = await Promise.all( + [ + client.sendCommand([ASKING_CMD], {chainId: chainId}), + fn(client, opts) + ] + ); + + return ret[1]; + }; + } + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, @@ -391,14 +417,15 @@ export default class RedisCluster< const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); let i = 0; - let myOpts = options; + + let myFn = fn; while (true) { try { - return await fn(client, myOpts); + return await myFn(client, options); } catch (err) { - // reset to passed in options, if changed by an ask request - myOpts = options; + myFn = fn; + // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; @@ -417,13 +444,7 @@ export default class RedisCluster< } client = redirectTo; - - const chainId = Symbol('Asking Chain'); - myOpts = options ? {...options} : {}; - myOpts.chainId = chainId; - - client.sendCommand(parseArgs(ASKING), {chainId: chainId}).catch(err => { console.log(`Asking Failed: ${err}`) } ); - + myFn = this.#handleAsk(fn); continue; } @@ -574,10 +595,12 @@ export default class RedisCluster< } close() { + this.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.close(); } destroy() { + this.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.destroy(); } diff --git a/packages/client/lib/commands/GEOSEARCH.ts b/packages/client/lib/commands/GEOSEARCH.ts index 8c77fd8923..869dc60bec 100644 --- a/packages/client/lib/commands/GEOSEARCH.ts +++ b/packages/client/lib/commands/GEOSEARCH.ts @@ -29,12 +29,7 @@ export function parseGeoSearchArguments( from: GeoSearchFrom, by: GeoSearchBy, options?: GeoSearchOptions, - store?: RedisArgument ) { - if (store !== undefined) { - parser.pushKey(store); - } - parser.pushKey(key); if (typeof from === 'string' || from instanceof Buffer) { diff --git a/packages/client/lib/commands/GEOSEARCHSTORE.ts b/packages/client/lib/commands/GEOSEARCHSTORE.ts index eb8e12abe6..34c6e0988e 100644 --- a/packages/client/lib/commands/GEOSEARCHSTORE.ts +++ b/packages/client/lib/commands/GEOSEARCHSTORE.ts @@ -17,7 +17,12 @@ export default { options?: GeoSearchStoreOptions ) { parser.push('GEOSEARCHSTORE'); - parseGeoSearchArguments(parser, source, from, by, options, destination); + + if (destination !== undefined) { + parser.pushKey(destination); + } + + parseGeoSearchArguments(parser, source, from, by, options); if (options?.STOREDIST) { parser.push('STOREDIST'); diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index be5522bdd8..0cf8d80abe 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -10,11 +10,12 @@ import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, import { promisify } from 'node:util'; import { exec } from 'node:child_process'; import { RESP_TYPES } from '../RESP/decoder'; -import { defineScript } from '../lua-script'; import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; import RedisBloomModules from '@redis/bloom'; +import { BasicPooledClientSideCache } from '../client/cache'; import { RedisTcpSocketOptions } from '../client/socket'; import { SQUARE_SCRIPT } from '../client/index.spec'; +import { once } from 'node:events'; const execAsync = promisify(exec); @@ -78,7 +79,7 @@ async function steadyState(frame: SentinelFramework) { } ["redis-sentinel-test-password", undefined].forEach(function (password) { - describe.skip(`Sentinel - password = ${password}`, () => { + describe(`Sentinel - password = ${password}`, () => { const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; const frame = new SentinelFramework(config); let tracer = new Array(); @@ -1136,6 +1137,34 @@ async function steadyState(frame: SentinelFramework) { tracer.push("added node and waiting on added promise"); await nodeAddedPromise; }) + + it('with client side caching', async function() { + this.timeout(30000); + const csc = new BasicPooledClientSideCache(); + + sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5}); + await sentinel.connect(); + + await sentinel.set('x', 1); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(1, csc.cacheMisses()); + assert.equal(3, csc.cacheHits()); + + const invalidatePromise = once(csc, 'invalidate'); + await sentinel.set('x', 2); + await invalidatePromise; + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(csc.cacheMisses(), 2); + assert.equal(csc.cacheHits(), 6); + }) }) describe('Sentinel Factory', function () { diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index d25fa03e55..a2037533c9 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,6 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache'; interface ClientInfo { id: number; @@ -272,7 +273,7 @@ export default class RedisSentinel< this.#options = options; - if (options?.commandOptions) { + if (options.commandOptions) { this.#commandOptions = options.commandOptions; } @@ -590,6 +591,11 @@ class RedisSentinelInternal< #trace: (msg: string) => unknown = () => { }; + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this.#clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -602,11 +608,22 @@ class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = options.nodeClientOptions ? Object.assign({} as RedisClientOptions, options.nodeClientOptions) : {}; + this.#nodeClientOptions = options.nodeClientOptions ? {...options.nodeClientOptions} : {}; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } + if (options.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig); + } + } + this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions, options.sentinelClientOptions) : {}; this.#sentinelClientOptions.modules = RedisSentinelModule; @@ -827,6 +844,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; @@ -875,6 +894,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 25dd4c4371..de6c90a70b 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -188,18 +188,22 @@ export class SentinelFramework extends DockerBase { } const options: RedisSentinelOptions = { + ...opts, name: this.config.sentinelName, sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }), passthroughClientErrorEvents: errors } if (this.config.password !== undefined) { - options.nodeClientOptions = {password: this.config.password}; - options.sentinelClientOptions = {password: this.config.password}; - } + if (!options.nodeClientOptions) { + options.nodeClientOptions = {}; + } + options.nodeClientOptions.password = this.config.password; - if (opts) { - Object.assign(options, opts); + if (!options.sentinelClientOptions) { + options.sentinelClientOptions = {}; + } + options.sentinelClientOptions = {password: this.config.password}; } return RedisSentinel.create(options); diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index 1f868ec517..05fe77060c 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -4,6 +4,7 @@ import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisS import COMMANDS from '../commands'; import RedisSentinel, { RedisSentinelClient } from '.'; import { RedisTcpSocketOptions } from '../client/socket'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; export interface RedisNode { host: string; @@ -59,6 +60,10 @@ export interface RedisSentinelOptions< * When `false`, the sentinel object will wait for the first available client from the pool. */ reserveClient?: boolean; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export interface SentinelCommander< diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index 90b789ddca..7e2404c2f7 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -1,5 +1,5 @@ -import { BasicCommandParser } from '../client/parser'; import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types'; +import { BasicCommandParser } from '../client/parser'; import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket'; import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types';