From 7e8ff4d3c7b1869f788b3969e0a889587294a08c Mon Sep 17 00:00:00 2001 From: Augustin Chan Date: Sat, 30 Nov 2024 18:23:27 +0800 Subject: [PATCH] fix: (core) Add circuit breaker pattern for database operations Implements circuit breaker pattern to handle database failures gracefully and prevent cascading failures. Fixes #712. Changes: - Adds CircuitBreaker class with CLOSED, OPEN, and HALF-OPEN states - Introduces BaseCircuitBreakerAdapter for database adapters - Configurable failure thresholds and recovery timeouts - Automatic recovery attempts in HALF-OPEN state - Detailed logging of circuit breaker state changes Circuit breaker configuration: - Opens after 5 consecutive failures (configurable) - Resets after 60 seconds in OPEN state - Requires 3 successful operations in HALF-OPEN state to close This helps prevent overwhelming failed database connections and provides graceful degradation during outages. --- packages/adapter-postgres/src/index.ts | 121 ++++++++++-------- .../src/database/BaseCircuitBreakerAdapter.ts | 31 +++++ packages/core/src/database/CircuitBreaker.ts | 68 ++++++++++ packages/core/src/index.ts | 1 + 4 files changed, 169 insertions(+), 52 deletions(-) create mode 100644 packages/core/src/database/BaseCircuitBreakerAdapter.ts create mode 100644 packages/core/src/database/CircuitBreaker.ts diff --git a/packages/adapter-postgres/src/index.ts b/packages/adapter-postgres/src/index.ts index bceeae1415f..864c3d621d8 100644 --- a/packages/adapter-postgres/src/index.ts +++ b/packages/adapter-postgres/src/index.ts @@ -20,9 +20,9 @@ import { type UUID, type IDatabaseCacheAdapter, Participant, - DatabaseAdapter, elizaLogger, getEmbeddingConfig, + BaseCircuitBreakerAdapter, } from "@ai16z/eliza"; import fs from "fs"; import { fileURLToPath } from "url"; @@ -32,7 +32,7 @@ const __filename = fileURLToPath(import.meta.url); // get the resolved path to t const __dirname = path.dirname(__filename); // get the name of the directory export class PostgresDatabaseAdapter - extends DatabaseAdapter + extends BaseCircuitBreakerAdapter implements IDatabaseCacheAdapter { private pool: InstanceType; @@ -43,7 +43,12 @@ export class PostgresDatabaseAdapter private readonly connectionTimeout: number = 5000; // 5 seconds constructor(connectionConfig: any) { - super(); + super({ + //CircuitBreaker config + failureThreshold: 5, + resetTimeout: 60000, // 1 minute + halfOpenMaxAttempts: 3, + }); const defaultConfig = { max: 20, @@ -78,53 +83,63 @@ export class PostgresDatabaseAdapter } private async withRetry(operation: () => Promise): Promise { - let lastError: Error = new Error("Unknown error"); // Initialize with default - - for (let attempt = 1; attempt <= this.maxRetries; attempt++) { - try { - return await operation(); - } catch (error) { - lastError = error as Error; - - if (attempt < this.maxRetries) { - // Calculate delay with exponential backoff - const backoffDelay = Math.min( - this.baseDelay * Math.pow(2, attempt - 1), - this.maxDelay - ); - - // Add jitter to prevent thundering herd - const jitter = Math.random() * this.jitterMax; - const delay = backoffDelay + jitter; + return this.withCircuitBreaker(async () => { + let lastError: Error = new Error("Unknown error"); - elizaLogger.warn( - `Database operation failed (attempt ${attempt}/${this.maxRetries}):`, - { + for (let attempt = 1; attempt <= this.maxRetries; attempt++) { + try { + return await operation(); + } catch (error) { + lastError = error as Error; + + if (attempt < this.maxRetries) { + // Calculate delay with exponential backoff + const backoffDelay = Math.min( + this.baseDelay * Math.pow(2, attempt - 1), + this.maxDelay + ); + + // Add jitter to prevent thundering herd + const jitter = Math.random() * this.jitterMax; + const delay = backoffDelay + jitter; + + elizaLogger.warn( + `Database operation failed (attempt ${attempt}/${this.maxRetries}):`, + { + error: + error instanceof Error + ? error.message + : String(error), + nextRetryIn: `${(delay / 1000).toFixed(1)}s`, + circuitState: this.circuitBreaker.getState(), + attempt, + maxRetries: this.maxRetries, + } + ); + + await new Promise((resolve) => + setTimeout(resolve, delay) + ); + } else { + elizaLogger.error("Max retry attempts reached:", { error: error instanceof Error ? error.message : String(error), - nextRetryIn: `${(delay / 1000).toFixed(1)}s`, - } - ); + totalAttempts: attempt, + circuitState: this.circuitBreaker.getState(), + }); - await new Promise((resolve) => setTimeout(resolve, delay)); - } else { - elizaLogger.error("Max retry attempts reached:", { - error: - error instanceof Error - ? error.message - : String(error), - totalAttempts: attempt, - }); - throw error instanceof Error - ? error - : new Error(String(error)); + // Let the circuit breaker know about the failure + throw error instanceof Error + ? error + : new Error(String(error)); + } } } - } - throw lastError; + throw lastError; + }, "PostgresDB"); } private async handlePoolError(error: Error) { @@ -159,16 +174,18 @@ export class PostgresDatabaseAdapter queryTextOrConfig: string | QueryConfig, values?: QueryConfigValues ): Promise> { - const client = await this.pool.connect(); - - try { - return client.query(queryTextOrConfig, values); - } catch (error) { - elizaLogger.error(error); - throw error; - } finally { - client.release(); - } + // Should be wrapped in withRetry + return this.withRetry(async () => { + const client = await this.pool.connect(); + try { + return await client.query(queryTextOrConfig, values); + } catch (error) { + elizaLogger.error(error); + throw error; + } finally { + client.release(); + } + }); } async init() { @@ -1167,7 +1184,7 @@ export class PostgresDatabaseAdapter ); return true; } catch (error) { - console.log("Error adding participant", error); + elizaLogger.error("Error adding participant", error); return false; } }); diff --git a/packages/core/src/database/BaseCircuitBreakerAdapter.ts b/packages/core/src/database/BaseCircuitBreakerAdapter.ts new file mode 100644 index 00000000000..3f3d87ff91d --- /dev/null +++ b/packages/core/src/database/BaseCircuitBreakerAdapter.ts @@ -0,0 +1,31 @@ +import { CircuitBreaker } from "./CircuitBreaker"; +import { elizaLogger } from "../logger"; +import { DatabaseAdapter } from "../database"; + +export abstract class BaseCircuitBreakerAdapter extends DatabaseAdapter { + protected circuitBreaker: CircuitBreaker; + + constructor(circuitBreakerConfig?: { + failureThreshold?: number; + resetTimeout?: number; + halfOpenMaxAttempts?: number; + }) { + super(); + this.circuitBreaker = new CircuitBreaker(circuitBreakerConfig); + } + + protected async withCircuitBreaker( + operation: () => Promise, + context: string + ): Promise { + try { + return await this.circuitBreaker.execute(operation); + } catch (error) { + elizaLogger.error(`Circuit breaker error in ${context}:`, { + error: error instanceof Error ? error.message : String(error), + state: this.circuitBreaker.getState(), + }); + throw error; + } + } +} diff --git a/packages/core/src/database/CircuitBreaker.ts b/packages/core/src/database/CircuitBreaker.ts new file mode 100644 index 00000000000..3efe21cf6f8 --- /dev/null +++ b/packages/core/src/database/CircuitBreaker.ts @@ -0,0 +1,68 @@ +export class CircuitBreaker { + private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED"; + private failureCount: number = 0; + private lastFailureTime?: number; + + private readonly failureThreshold: number; + private readonly resetTimeout: number; + private readonly halfOpenMaxAttempts: number; + private halfOpenSuccesses: number = 0; + + constructor( + config: { + failureThreshold?: number; + resetTimeout?: number; + halfOpenMaxAttempts?: number; + } = {} + ) { + this.failureThreshold = config.failureThreshold ?? 5; + this.resetTimeout = config.resetTimeout ?? 60000; // 1 minute + this.halfOpenMaxAttempts = config.halfOpenMaxAttempts ?? 3; + } + + async execute(operation: () => Promise): Promise { + if (this.state === "OPEN") { + if (Date.now() - (this.lastFailureTime || 0) > this.resetTimeout) { + this.state = "HALF_OPEN"; + this.halfOpenSuccesses = 0; + } else { + throw new Error("Circuit breaker is OPEN"); + } + } + + try { + const result = await operation(); + + if (this.state === "HALF_OPEN") { + this.halfOpenSuccesses++; + if (this.halfOpenSuccesses >= this.halfOpenMaxAttempts) { + this.reset(); + } + } + + return result; + } catch (error) { + this.handleFailure(); + throw error; + } + } + + private handleFailure(): void { + this.failureCount++; + this.lastFailureTime = Date.now(); + + if (this.failureCount >= this.failureThreshold) { + this.state = "OPEN"; + } + } + + private reset(): void { + this.state = "CLOSED"; + this.failureCount = 0; + this.lastFailureTime = undefined; + } + + getState(): "CLOSED" | "OPEN" | "HALF_OPEN" { + return this.state; + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b9a78da8485..931c6da3f71 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -24,3 +24,4 @@ export * from "./enviroment.ts"; export * from "./cache.ts"; export { default as knowledge } from "./knowledge.ts"; export * from "./utils.ts"; +export { BaseCircuitBreakerAdapter } from "./database/BaseCircuitBreakerAdapter";