From a03ba3261cfe0ed0e7246bb697d803f0081346b0 Mon Sep 17 00:00:00 2001 From: hyphenized <28708889+hyphenized@users.noreply.github.com> Date: Sat, 25 Jan 2025 18:18:40 -0500 Subject: [PATCH 1/5] Fix types Use correct types for providers supported by serial-fallback-provider. Should also help avoid some type casts when accessing provider internals. --- .../chain/serial-fallback-provider.ts | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/background/services/chain/serial-fallback-provider.ts b/background/services/chain/serial-fallback-provider.ts index a0e93321f..6a05f9ed0 100644 --- a/background/services/chain/serial-fallback-provider.ts +++ b/background/services/chain/serial-fallback-provider.ts @@ -29,10 +29,12 @@ import { RpcConfig } from "./db" import TahoAlchemyProvider from "./taho-provider" import { getErrorType } from "./errors" +type RPCProvider = WebSocketProvider | JsonRpcProvider | JsonRpcBatchProvider + export type ProviderCreator = { type: "alchemy" | "custom" | "generic" supportedMethods?: string[] - creator: () => WebSocketProvider | JsonRpcProvider + creator: () => RPCProvider } /** @@ -108,9 +110,7 @@ function backedOffMs(): number { * either closing or already closed. Ethers does not provide direct access to * this information, nor does it attempt to reconnect in these cases. */ -function isClosedOrClosingWebSocketProvider( - provider: JsonRpcProvider, -): boolean { +function isClosedOrClosingWebSocketProvider(provider: RPCProvider): boolean { if (provider instanceof WebSocketProvider) { // Digging into the innards of Ethers here because there's no // other way to get access to the WebSocket connection situation. @@ -130,7 +130,7 @@ function isClosedOrClosingWebSocketProvider( * Returns true if the given provider is using a WebSocket AND the WebSocket is * connecting. Ethers does not provide direct access to this information. */ -function isConnectingWebSocketProvider(provider: JsonRpcProvider): boolean { +function isConnectingWebSocketProvider(provider: RPCProvider): boolean { if (provider instanceof WebSocketProvider) { // Digging into the innards of Ethers here because there's no // other way to get access to the WebSocket connection situation. @@ -195,23 +195,19 @@ function customOrDefaultProvider( export default class SerialFallbackProvider extends JsonRpcProvider { // Functions that will create and initialize a new provider, in priority // order. - private providerCreators: [ - () => WebSocketProvider | JsonRpcProvider, - ...(() => JsonRpcProvider)[], - ] + private providerCreators: (() => RPCProvider)[] // The currently-used provider, produced by the provider-creator at // currentProviderIndex. - private currentProvider: JsonRpcProvider + private currentProvider: RPCProvider - private alchemyProvider: JsonRpcProvider | undefined + private alchemyProvider: RPCProvider | undefined - private customProvider: JsonRpcProvider | undefined + private customProvider: RPCProvider | undefined private customProviderSupportedMethods: string[] = [] - private cachedProvidersByIndex: Record = - {} + private cachedProvidersByIndex: Record = {} /** * This object holds all messages that are either being sent to a provider @@ -227,13 +223,11 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } } = {} - private alchemyProviderCreator: - | (() => WebSocketProvider | JsonRpcProvider) - | undefined + private alchemyProviderCreator: (() => RPCProvider) | undefined supportsAlchemy = false - private customProviderCreator: (() => JsonRpcProvider) | undefined + private customProviderCreator: (() => RPCProvider) | undefined /** * Since our architecture follows a pattern of using distinct provider instances @@ -281,7 +275,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { // reloaded and the property of having code updates quite rarely. private latestHasCodeCache: { [address: string]: { - hasCode: boolean + hasCode: string } } = {} @@ -617,7 +611,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { if (method === "eth_getCode" && (params as string[])[1] === "latest") { const address = (params as string[])[0] this.latestHasCodeCache[address] = { - hasCode: result as boolean, + hasCode: result as string, } } } @@ -827,7 +821,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } /** - * Behaves the same as the `JsonRpcProvider` `on` method, but also trakcs the + * Behaves the same as the `JsonRpcProvider` `on` method, but also tracks the * event subscription so that an underlying provider failure will not prevent * it from firing. */ @@ -844,7 +838,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } /** - * Behaves the same as the `JsonRpcProvider` `once` method, but also trakcs + * Behaves the same as the `JsonRpcProvider` `once` method, but also tracks * the event subscription so that an underlying provider failure will not * prevent it from firing. */ @@ -995,7 +989,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { * @param provider The provider to use to resubscribe * @returns A boolean indicating if websocket subscription was successful or not */ - private async resubscribe(provider: JsonRpcProvider): Promise { + private async resubscribe(provider: RPCProvider): Promise { logger.debug("Resubscribing subscriptions", "on chain", this.chainID, "...") if ( @@ -1162,9 +1156,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } } -function getProviderCreator( - rpcUrl: string, -): JsonRpcProvider | WebSocketProvider { +function getProviderCreator(rpcUrl: string): RPCProvider { const url = new URL(rpcUrl) if (/^wss?/.test(url.protocol)) { return new WebSocketProvider(rpcUrl) From 5d29623102b0707ce69548605c8803891a0c8556 Mon Sep 17 00:00:00 2001 From: hyphenized <28708889+hyphenized@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:35:55 -0500 Subject: [PATCH 2/5] Add custom batch provider implementation Added `TahoRPCProvider` to optimize RPC batching and request handling. Batch limits can now be set per provider. This also sets TahoRPCProvider as the default provider used since most json 2.0 rpcs support batching. It does not handle cases where the provider refuses to handle batches even if they consist of a single request, as is the case with the flashbots rpc. --- .../chain/serial-fallback-provider.ts | 156 +++------- .../services/chain/taho-rpc-provider.ts | 282 ++++++++++++++++++ 2 files changed, 326 insertions(+), 112 deletions(-) create mode 100644 background/services/chain/taho-rpc-provider.ts diff --git a/background/services/chain/serial-fallback-provider.ts b/background/services/chain/serial-fallback-provider.ts index 6a05f9ed0..145be3d0b 100644 --- a/background/services/chain/serial-fallback-provider.ts +++ b/background/services/chain/serial-fallback-provider.ts @@ -1,6 +1,5 @@ import { EventType, - JsonRpcBatchProvider, JsonRpcProvider, Listener, WebSocketProvider, @@ -28,8 +27,9 @@ import { FeatureFlags, isEnabled } from "../../features" import { RpcConfig } from "./db" import TahoAlchemyProvider from "./taho-provider" import { getErrorType } from "./errors" +import TahoRPCProvider from "./taho-rpc-provider" -type RPCProvider = WebSocketProvider | JsonRpcProvider | JsonRpcBatchProvider +type RPCProvider = WebSocketProvider | JsonRpcProvider | TahoRPCProvider export type ProviderCreator = { type: "alchemy" | "custom" | "generic" @@ -37,6 +37,14 @@ export type ProviderCreator = { creator: () => RPCProvider } +const isWebSocketProvider = ( + provider: RPCProvider, +): provider is WebSocketProvider => provider instanceof WebSocketProvider + +const isJsonRpcProvider = ( + provider: RPCProvider, +): provider is TahoRPCProvider => provider instanceof TahoRPCProvider + /** * Method list, to describe which rpc method calls on which networks should * prefer alchemy provider over the generic ones. @@ -111,11 +119,8 @@ function backedOffMs(): number { * this information, nor does it attempt to reconnect in these cases. */ function isClosedOrClosingWebSocketProvider(provider: RPCProvider): boolean { - if (provider instanceof WebSocketProvider) { - // Digging into the innards of Ethers here because there's no - // other way to get access to the WebSocket connection situation. - // eslint-disable-next-line no-underscore-dangle - const webSocket = provider._websocket as WebSocket + if (isWebSocketProvider(provider)) { + const webSocket = provider.websocket return ( webSocket.readyState === WebSocket.CLOSING || @@ -131,11 +136,8 @@ function isClosedOrClosingWebSocketProvider(provider: RPCProvider): boolean { * connecting. Ethers does not provide direct access to this information. */ function isConnectingWebSocketProvider(provider: RPCProvider): boolean { - if (provider instanceof WebSocketProvider) { - // Digging into the innards of Ethers here because there's no - // other way to get access to the WebSocket connection situation. - // eslint-disable-next-line no-underscore-dangle - const webSocket = provider._websocket as WebSocket + if (isWebSocketProvider(provider)) { + const webSocket = provider.websocket return webSocket.readyState === WebSocket.CONNECTING } @@ -241,21 +243,6 @@ export default class SerialFallbackProvider extends JsonRpcProvider { // for reconnects when relevant. private currentProviderIndex = 0 - // If nonzero and the underlying provider is a batch provider, forces the - // batch size to be no more than this number, holding other requests until - // the existing batch has cleared. - private forcedBatchMaxSize: number = 0 - - // If this promise is set, new RPC calls will await on it before being - // processed. When forcedBatchMaxSize is nonzero and that number of RPC calls - // are pending, this promise will be set so subsequent requests will wait - // until the batch flushes. - private forcedBatchMaxPromise: Promise | undefined = undefined - - // During max size update, this value is set so that the value is not - // decreased by multiple failed requests. - private forcedBatchMaxPreviousSize: number = 0 - // TEMPORARY cache for latest account balances to reduce number of rpc calls // This is intended as a temporary fix to the burst of account enrichment that // happens when the extension is first loaded up as a result of activity emission @@ -366,46 +353,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { delete this.messagesToSend[messageId] return cachedResult } - - if (this.forcedBatchMaxPromise) { - await this.forcedBatchMaxPromise - } - - const pendingBatch = - "_pendingBatch" in this.currentProvider - ? // Accessing ethers internals for forced max batch sizing. - // eslint-disable-next-line no-underscore-dangle - (this.currentProvider._pendingBatch as { length: number } | undefined) - : undefined - const pendingBatchSize = pendingBatch?.length const existingProviderIndex = this.currentProviderIndex - - if ( - pendingBatch && - this.forcedBatchMaxSize && - // Accessing ethers internals for forced max batch sizing. - // eslint-disable-next-line no-underscore-dangle - pendingBatch.length >= this.forcedBatchMaxSize - ) { - this.forcedBatchMaxPromise = new Promise((resolve) => { - const checkInterval = setInterval(() => { - const latestPendingBatch = - "_pendingBatch" in this.currentProvider - ? // Accessing ethers internals for forced max batch sizing. - // eslint-disable-next-line no-underscore-dangle - (this.currentProvider._pendingBatch as - | { length: number } - | undefined) - : undefined - - if ((latestPendingBatch?.length ?? 0) < this.forcedBatchMaxSize) { - resolve() - clearInterval(checkInterval) - } - }, 5) - }) - } - try { if (isClosedOrClosingWebSocketProvider(this.currentProvider)) { // Detect disconnected WebSocket and immediately throw. @@ -445,44 +393,37 @@ export default class SerialFallbackProvider extends JsonRpcProvider { return result } catch (error) { // Awful, but what can ya do. + const stringifiedError = String(error) const errorType = getErrorType(stringifiedError, method) if ( errorType === "batch-limit-exceeded" && - (pendingBatchSize === undefined || pendingBatchSize === 0) + isJsonRpcProvider(this.currentProvider) ) { - this.forcedBatchMaxPreviousSize = - pendingBatch?.length ?? pendingBatchSize ?? 1 - this.forcedBatchMaxSize = - (pendingBatch?.length ?? pendingBatchSize ?? 2) / 2 + const requestBatch = this.currentProvider.getBatchFromError(error) - logger.debug( - "Setting a max batch size of", - this.forcedBatchMaxSize, - "on chain", - this.chainID, - "and retrying: ", - method, - params, - ) + const batchLen = requestBatch.length - return this.routeRpcCall(messageId) - } + // Note that every other request in the batch will set the length to + // the same value + if (batchLen <= this.currentProvider.getOptions().maxBatchLength) { + const newMaxBatchLen = Math.max(Math.floor(batchLen / 2), 1) - if (errorType === "batch-limit-exceeded") { - logger.debug( - "Using max batch size of", - this.forcedBatchMaxSize, - "on chain", - this.chainID, - "and retrying: ", - method, - params, - ) + this.currentProvider.setOptions({ + maxBatchLength: newMaxBatchLen, + }) - return this.routeRpcCall(messageId) + logger.debug( + "Setting a max batch size of", + newMaxBatchLen, + "for rpc", + this.currentProvider.connection.url, + ) + } + // Retry with a new limit on batch length + return waitAnd(500, () => this.routeRpcCall(messageId)) } if ( @@ -750,7 +691,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { ): Promise { const subscription = { tag, param, processFunc } - if (this.currentProvider instanceof WebSocketProvider) { + if (isWebSocketProvider(this.currentProvider)) { // eslint-disable-next-line no-underscore-dangle await this.currentProvider._subscribe(tag, param, processFunc) this.subscriptions.push(subscription) @@ -905,12 +846,12 @@ export default class SerialFallbackProvider extends JsonRpcProvider { private disconnectCurrentProvider() { logger.debug( "Disconnecting current provider; websocket: ", - this.currentProvider instanceof WebSocketProvider, + isWebSocketProvider(this.currentProvider), "on chain", this.chainID, ".", ) - if (this.currentProvider instanceof WebSocketProvider) { + if (isWebSocketProvider(this.currentProvider)) { this.currentProvider.destroy() } else { // For non-WebSocket providers, kill all subscriptions so the listeners @@ -1003,9 +944,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { return false } - if (provider instanceof WebSocketProvider) { - const websocketProvider = provider as WebSocketProvider - + if (isWebSocketProvider(provider)) { // Chain promises to serially resubscribe. // // TODO If anything fails along the way, it should yield the same kind of @@ -1017,7 +956,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { // Direct subscriptions are internal, but we want to be able to // restore them. // eslint-disable-next-line no-underscore-dangle - websocketProvider._subscribe(tag, param, processFunc), + provider._subscribe(tag, param, processFunc), ), ), Promise.resolve(), @@ -1162,15 +1101,7 @@ function getProviderCreator(rpcUrl: string): RPCProvider { return new WebSocketProvider(rpcUrl) } - if (/rpc\.ankr\.com|1rpc\.io|polygon-rpc\.com/.test(url.href)) { - return new JsonRpcBatchProvider({ - url: rpcUrl, - throttleLimit: 1, - timeout: PROVIDER_REQUEST_TIMEOUT, - }) - } - - return new JsonRpcProvider({ + return new TahoRPCProvider({ url: rpcUrl, throttleLimit: 1, timeout: PROVIDER_REQUEST_TIMEOUT, @@ -1181,7 +1112,8 @@ export function makeFlashbotsProviderCreator(): ProviderCreator { return { type: "custom", supportedMethods: ["eth_sendRawTransaction"], - creator: () => getProviderCreator(FLASHBOTS_RPC_URL), + creator: () => + new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchSize: 1 }), } } @@ -1194,7 +1126,7 @@ export function makeSerialFallbackProvider( return new SerialFallbackProvider(FORK.chainID, [ { type: "generic" as const, - creator: () => new JsonRpcProvider(process.env.MAINNET_FORK_URL), + creator: () => new TahoRPCProvider(process.env.MAINNET_FORK_URL), }, ]) } @@ -1213,7 +1145,7 @@ export function makeSerialFallbackProvider( return new SerialFallbackProvider(ARBITRUM_SEPOLIA.chainID, [ { type: "generic" as const, - creator: () => new JsonRpcBatchProvider(process.env.ARBITRUM_FORK_RPC), + creator: () => new TahoRPCProvider(process.env.ARBITRUM_FORK_RPC), }, ]) } diff --git a/background/services/chain/taho-rpc-provider.ts b/background/services/chain/taho-rpc-provider.ts new file mode 100644 index 000000000..2129bb2d7 --- /dev/null +++ b/background/services/chain/taho-rpc-provider.ts @@ -0,0 +1,282 @@ +import { JsonRpcProvider, Networkish } from "@ethersproject/providers" +import { deepCopy } from "@ethersproject/properties" +import { ConnectionInfo, fetchJson } from "@ethersproject/web" +import logger from "../../lib/logger" + +type RPCPayload = { + method: string + params: unknown[] + id: number + jsonrpc: "2.0" +} + +type RPCResponse = RPCResponseResult | RPCResponseError + +type RPCResponseError = { + id: number + error: { + code: number + message: string + data?: unknown + } + jsonrpc: "2.0" +} + +type RPCResponseResult = { + id: number + result: unknown + jsonrpc: "2.0" +} + +type RPCPendingRequest = { + resolve: (value: unknown) => void + reject: (reason: unknown) => void + payload: RPCPayload +} + +type RPCOptions = { + /** + * Maximum number of requests to be batched + * @default 20 requests + */ + maxBatchLength?: number + /** + * Maximum length in bytes of the batch + * @default 1Mb + */ + maxBatchSize?: number + /** + * How long to wait to aggregate requests + * @default 100ms + */ + batchStallTime?: number +} + +const defaultOptions = { + maxBatchLength: 20, + // eslint-disable-next-line no-bitwise + maxBatchSize: 1 << 20, // 1Mb + batchStallTime: 100, +} + +const makeSerializableError = ( + message: string, + code?: number, + data?: unknown, +) => { + const error = new Error() + Object.assign(error, { message, code, data }) + return error +} + +/** + * TODO: This should be able to fallback into a standard rpc provider + * if batches are unsupported + */ +export default class TahoRPCProvider extends JsonRpcProvider { + // Requests queued in this provider + private pending: RPCPendingRequest[] = [] + + #options: Required + + // Tracks whether this provider is currently accepting requests + #destroyed = false + + #sendTimer: NodeJS.Timer | null = null + + #errorToBatch = new WeakMap() + + constructor( + url?: ConnectionInfo | string, + network?: Networkish, + options?: RPCOptions, + ) { + super(url, network) + + this.#options = { ...defaultOptions, ...options } + } + + private async sendNextBatch() { + // This prevents queueing multiple batches simultaneously + if (this.#sendTimer) { + return + } + + this.#sendTimer = setTimeout(() => { + this.#sendTimer = null + + const batch: RPCPendingRequest[] = [] + + const trackBatchError = (err: Error) => { + this.#errorToBatch.set(err, batch) + return err + } + + while (this.pending.length > 0) { + batch.push(this.pending.shift() as RPCPendingRequest) + + if (batch.length === this.#options.maxBatchLength) { + break + } + } + + // Enforce max batch size + while ( + JSON.stringify(batch.map(({ payload }) => payload)).length > + this.#options.maxBatchSize + ) { + this.pending.unshift(batch.pop() as RPCPendingRequest) + + if (batch.length === 0) { + throw new Error("INVALID_MAX_BATCH_SIZE") + } + } + + if (this.pending.length) { + // if there are still pending requests, schedule another batch for sending + this.sendNextBatch() + } + + const request = batch.map(({ payload }) => payload) + + this.emit("debug", { + action: "requestBatch", + request: deepCopy(request), + provider: this, + }) + + fetchJson( + this.connection, + // Some RPCs will reject batch payloads even if they send a single + // request (e.g. flashbots) + JSON.stringify(request.length === 1 ? request[0] : request), + ) + .then((response) => { + const wrappedResponse: RPCResponse[] = Array.isArray(response) + ? response + : [response] + + this.emit("debug", { + action: "response", + request, + response, + provider: this, + }) + + if (batch.length > 1 && !Array.isArray(response)) { + batch.forEach(({ reject }) => { + reject( + trackBatchError( + makeSerializableError( + response?.error?.message ?? "INVALID_RESPONSE", + response?.error?.code, + response, + ), + ), + ) + }) + } + + batch.forEach(({ payload: { id }, reject, resolve }) => { + const match = wrappedResponse.find((resp) => id === resp.id) + + if (!match) { + reject( + trackBatchError(makeSerializableError("bad response", -32000)), + ) + return + } + + if ("error" in match) { + reject( + trackBatchError( + makeSerializableError( + match.error.message, + match.error.code, + match.error.data, + ), + ), + ) + } else { + resolve(match.result) + } + }) + }) + .catch((error) => { + this.emit("debug", { + action: "response", + error, + request, + provider: this, + }) + + // Any other error during fetch should propagate + batch.forEach(({ reject }) => reject(trackBatchError(error))) + }) + }, this.#options.batchStallTime) + } + + override send(method: string, params: unknown[]): Promise { + if (this.#destroyed) { + return Promise.reject(new Error("PROVIDER_DESTROYED")) + } + + const promise = new Promise((resolve, reject) => { + this.pending.push({ + resolve, + reject, + // eslint-disable-next-line no-plusplus, no-underscore-dangle + payload: { method, params, id: this._nextId++, jsonrpc: "2.0" }, + }) + }) + + this.sendNextBatch() + return promise + } + + /** + * Drops any pending requests + */ + async destroy() { + this.#destroyed = true + + if (this.#sendTimer) clearTimeout(this.#sendTimer) + + this.pending.forEach((request) => + request.reject(new Error("PROVIDER_DESTROYED")), + ) + + this.pending = [] + } + + async reconnect() { + this.#destroyed = false + } + + setOptions(settings: RPCOptions): void { + Object.assign(this.#options, settings) + } + + getOptions(): Readonly> { + return { ...this.#options } + } + + /** + * Useful for adjusting batch limits + * @param err The error returned as the response + * @returns The associated batch sent + */ + getBatchFromError(err: unknown): RPCPendingRequest[] { + if ( + typeof err !== "object" || + err === null || + !this.#errorToBatch.has(err) + ) { + throw logger.buildError( + `Could not retrieve batch using error: ${err} as reference`, + ) + } + + return this.#errorToBatch.get(err)! + } +} From b428120ba82c7281d3ddc5c18d4f99dcd43b6776 Mon Sep 17 00:00:00 2001 From: hyphenized <28708889+hyphenized@users.noreply.github.com> Date: Sat, 1 Feb 2025 01:47:22 -0500 Subject: [PATCH 3/5] Improve request handling in TahoRPCProvider Reduced the max batch length from 20 to 10 for better compatibility with public RPCs and added a configurable delay between requests to prevent rate limit errors. --- .../services/chain/taho-rpc-provider.ts | 190 +++++++++++------- 1 file changed, 112 insertions(+), 78 deletions(-) diff --git a/background/services/chain/taho-rpc-provider.ts b/background/services/chain/taho-rpc-provider.ts index 2129bb2d7..7d3448d47 100644 --- a/background/services/chain/taho-rpc-provider.ts +++ b/background/services/chain/taho-rpc-provider.ts @@ -2,6 +2,7 @@ import { JsonRpcProvider, Networkish } from "@ethersproject/providers" import { deepCopy } from "@ethersproject/properties" import { ConnectionInfo, fetchJson } from "@ethersproject/web" import logger from "../../lib/logger" +import { wait } from "../../lib/utils" type RPCPayload = { method: string @@ -37,7 +38,7 @@ type RPCPendingRequest = { type RPCOptions = { /** * Maximum number of requests to be batched - * @default 20 requests + * @default 10 requests */ maxBatchLength?: number /** @@ -50,13 +51,18 @@ type RPCOptions = { * @default 100ms */ batchStallTime?: number + /** + * How long to wait between each payload sent + */ + delayBetweenRequests?: number } const defaultOptions = { - maxBatchLength: 20, + maxBatchLength: 10, // Seems to work fine for public rpcs // eslint-disable-next-line no-bitwise maxBatchSize: 1 << 20, // 1Mb batchStallTime: 100, + delayBetweenRequests: 500, } const makeSerializableError = ( @@ -70,8 +76,16 @@ const makeSerializableError = ( } /** - * TODO: This should be able to fallback into a standard rpc provider - * if batches are unsupported + * Custom JSON-RPC provider that supports batching and optimized request handling + * + * This provider works similarly to ethers `JsonRpcBatchProvider` albeit with a + * few differences: It allows configuring maximum batch size and the time window + * to aggregate requests into a batch. Additionally, it also supports throttling + * between requests and can fallback to individual requests if necessary. + * + * It also features `disconnect`/`reconnect` methods to manage polling and clear + * pending requests in specific scenarios (e.g. network errors) + * */ export default class TahoRPCProvider extends JsonRpcProvider { // Requests queued in this provider @@ -86,6 +100,8 @@ export default class TahoRPCProvider extends JsonRpcProvider { #errorToBatch = new WeakMap() + #sendBatchThrottle = Promise.resolve() + constructor( url?: ConnectionInfo | string, network?: Networkish, @@ -102,7 +118,7 @@ export default class TahoRPCProvider extends JsonRpcProvider { return } - this.#sendTimer = setTimeout(() => { + this.#sendTimer = setTimeout(async () => { this.#sendTimer = null const batch: RPCPendingRequest[] = [] @@ -133,10 +149,12 @@ export default class TahoRPCProvider extends JsonRpcProvider { } if (this.pending.length) { - // if there are still pending requests, schedule another batch for sending + // If there are still pending requests, start building another batch this.sendNextBatch() } + await this.#sendBatchThrottle + const request = batch.map(({ payload }) => payload) this.emit("debug", { @@ -145,80 +163,94 @@ export default class TahoRPCProvider extends JsonRpcProvider { provider: this, }) - fetchJson( - this.connection, - // Some RPCs will reject batch payloads even if they send a single - // request (e.g. flashbots) - JSON.stringify(request.length === 1 ? request[0] : request), - ) - .then((response) => { - const wrappedResponse: RPCResponse[] = Array.isArray(response) - ? response - : [response] - - this.emit("debug", { - action: "response", - request, - response, - provider: this, - }) - - if (batch.length > 1 && !Array.isArray(response)) { - batch.forEach(({ reject }) => { - reject( - trackBatchError( - makeSerializableError( - response?.error?.message ?? "INVALID_RESPONSE", - response?.error?.code, - response, - ), - ), - ) + this.#sendBatchThrottle = this.#sendBatchThrottle + .then(() => + fetchJson( + this.connection, + // Some RPCs will reject batch payloads even if they send a single + // request (e.g. flashbots) + JSON.stringify(request.length === 1 ? request[0] : request), + ) + .then((response) => { + const wrappedResponse: RPCResponse[] = Array.isArray(response) + ? response + : [response] + + this.emit("debug", { + action: "response", + request, + response, + provider: this, + }) + + // For cases where a batch is sent and a single error object is returned + // e.g. batch size exceeded + if (batch.length > 1 && !Array.isArray(response)) { + batch.forEach(({ reject }) => { + reject( + trackBatchError( + makeSerializableError( + response?.error?.message ?? "INVALID_RESPONSE", + response?.error?.code, + { response, batch }, + ), + ), + ) + }) + } + + batch.forEach(({ payload: { id }, reject, resolve }) => { + const match = wrappedResponse.find((resp) => id === resp.id) + + if (!match) { + reject( + trackBatchError( + makeSerializableError("bad response", -32000, { + response, + batch, + }), + ), + ) + return + } + + if ("error" in match) { + reject( + trackBatchError( + makeSerializableError( + match.error.message, + match.error.code, + { + response: match, + batch, + }, + ), + ), + ) + } else { + resolve(match.result) + } + }) }) - } - - batch.forEach(({ payload: { id }, reject, resolve }) => { - const match = wrappedResponse.find((resp) => id === resp.id) - - if (!match) { - reject( - trackBatchError(makeSerializableError("bad response", -32000)), - ) - return - } - - if ("error" in match) { - reject( - trackBatchError( - makeSerializableError( - match.error.message, - match.error.code, - match.error.data, - ), - ), - ) - } else { - resolve(match.result) - } - }) - }) - .catch((error) => { - this.emit("debug", { - action: "response", - error, - request, - provider: this, - }) - - // Any other error during fetch should propagate - batch.forEach(({ reject }) => reject(trackBatchError(error))) - }) + .catch((error) => { + this.emit("debug", { + action: "response", + error, + request, + provider: this, + }) + + // Any other error during fetch should propagate + batch.forEach(({ reject }) => reject(trackBatchError(error))) + }), + ) + .then(() => wait(this.#options.delayBetweenRequests)) }, this.#options.batchStallTime) } override send(method: string, params: unknown[]): Promise { if (this.#destroyed) { - return Promise.reject(new Error("PROVIDER_DESTROYED")) + return Promise.reject(new Error("NETWORK_ERROR")) } const promise = new Promise((resolve, reject) => { @@ -235,15 +267,16 @@ export default class TahoRPCProvider extends JsonRpcProvider { } /** - * Drops any pending requests + * Drops any pending requests and disables polling */ - async destroy() { + disconnect() { this.#destroyed = true - + this.polling = false if (this.#sendTimer) clearTimeout(this.#sendTimer) this.pending.forEach((request) => - request.reject(new Error("PROVIDER_DESTROYED")), + // This error will increase retry count, even though request hasn't been sent + request.reject(new Error("NETWORK_ERROR")), ) this.pending = [] @@ -251,6 +284,7 @@ export default class TahoRPCProvider extends JsonRpcProvider { async reconnect() { this.#destroyed = false + this.polling = true } setOptions(settings: RPCOptions): void { From 9517383c6acb495581f0cb832635b8d602811917 Mon Sep 17 00:00:00 2001 From: hyphenized <28708889+hyphenized@users.noreply.github.com> Date: Mon, 17 Feb 2025 02:21:12 -0500 Subject: [PATCH 4/5] Remove delay, exposed constants on custom provider Removed delay between requests as it introduces hidden latency and moved unused exposed options to constants. Batch stall time has been adjusted accordingly. Type alias removed Co-authored-by: Antonio Salazar Cardozo --- .../chain/serial-fallback-provider.ts | 51 ++--- .../services/chain/taho-rpc-provider.ts | 200 ++++++++---------- 2 files changed, 117 insertions(+), 134 deletions(-) diff --git a/background/services/chain/serial-fallback-provider.ts b/background/services/chain/serial-fallback-provider.ts index 145be3d0b..58f47b63c 100644 --- a/background/services/chain/serial-fallback-provider.ts +++ b/background/services/chain/serial-fallback-provider.ts @@ -29,22 +29,16 @@ import TahoAlchemyProvider from "./taho-provider" import { getErrorType } from "./errors" import TahoRPCProvider from "./taho-rpc-provider" -type RPCProvider = WebSocketProvider | JsonRpcProvider | TahoRPCProvider - export type ProviderCreator = { type: "alchemy" | "custom" | "generic" supportedMethods?: string[] - creator: () => RPCProvider + creator: () => JsonRpcProvider } const isWebSocketProvider = ( - provider: RPCProvider, + provider: JsonRpcProvider, ): provider is WebSocketProvider => provider instanceof WebSocketProvider -const isJsonRpcProvider = ( - provider: RPCProvider, -): provider is TahoRPCProvider => provider instanceof TahoRPCProvider - /** * Method list, to describe which rpc method calls on which networks should * prefer alchemy provider over the generic ones. @@ -118,7 +112,9 @@ function backedOffMs(): number { * either closing or already closed. Ethers does not provide direct access to * this information, nor does it attempt to reconnect in these cases. */ -function isClosedOrClosingWebSocketProvider(provider: RPCProvider): boolean { +function isClosedOrClosingWebSocketProvider( + provider: JsonRpcProvider, +): boolean { if (isWebSocketProvider(provider)) { const webSocket = provider.websocket @@ -135,7 +131,7 @@ function isClosedOrClosingWebSocketProvider(provider: RPCProvider): boolean { * Returns true if the given provider is using a WebSocket AND the WebSocket is * connecting. Ethers does not provide direct access to this information. */ -function isConnectingWebSocketProvider(provider: RPCProvider): boolean { +function isConnectingWebSocketProvider(provider: JsonRpcProvider): boolean { if (isWebSocketProvider(provider)) { const webSocket = provider.websocket return webSocket.readyState === WebSocket.CONNECTING @@ -197,19 +193,20 @@ function customOrDefaultProvider( export default class SerialFallbackProvider extends JsonRpcProvider { // Functions that will create and initialize a new provider, in priority // order. - private providerCreators: (() => RPCProvider)[] + private providerCreators: (() => JsonRpcProvider)[] // The currently-used provider, produced by the provider-creator at // currentProviderIndex. - private currentProvider: RPCProvider + private currentProvider: JsonRpcProvider - private alchemyProvider: RPCProvider | undefined + private alchemyProvider: JsonRpcProvider | undefined - private customProvider: RPCProvider | undefined + private customProvider: JsonRpcProvider | undefined private customProviderSupportedMethods: string[] = [] - private cachedProvidersByIndex: Record = {} + private cachedProvidersByIndex: Record = + {} /** * This object holds all messages that are either being sent to a provider @@ -225,11 +222,11 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } } = {} - private alchemyProviderCreator: (() => RPCProvider) | undefined + private alchemyProviderCreator: (() => JsonRpcProvider) | undefined supportsAlchemy = false - private customProviderCreator: (() => RPCProvider) | undefined + private customProviderCreator: (() => JsonRpcProvider) | undefined /** * Since our architecture follows a pattern of using distinct provider instances @@ -400,16 +397,20 @@ export default class SerialFallbackProvider extends JsonRpcProvider { if ( errorType === "batch-limit-exceeded" && - isJsonRpcProvider(this.currentProvider) + this.currentProvider instanceof TahoRPCProvider ) { const requestBatch = this.currentProvider.getBatchFromError(error) - const batchLen = requestBatch.length - // Note that every other request in the batch will set the length to // the same value - if (batchLen <= this.currentProvider.getOptions().maxBatchLength) { - const newMaxBatchLen = Math.max(Math.floor(batchLen / 2), 1) + if ( + requestBatch.length <= + this.currentProvider.getOptions().maxBatchLength + ) { + const newMaxBatchLen = Math.max( + Math.floor(requestBatch.length / 2), + 1, + ) this.currentProvider.setOptions({ maxBatchLength: newMaxBatchLen, @@ -930,7 +931,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { * @param provider The provider to use to resubscribe * @returns A boolean indicating if websocket subscription was successful or not */ - private async resubscribe(provider: RPCProvider): Promise { + private async resubscribe(provider: JsonRpcProvider): Promise { logger.debug("Resubscribing subscriptions", "on chain", this.chainID, "...") if ( @@ -1095,7 +1096,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider { } } -function getProviderCreator(rpcUrl: string): RPCProvider { +function getProviderCreator(rpcUrl: string): JsonRpcProvider { const url = new URL(rpcUrl) if (/^wss?/.test(url.protocol)) { return new WebSocketProvider(rpcUrl) @@ -1113,7 +1114,7 @@ export function makeFlashbotsProviderCreator(): ProviderCreator { type: "custom", supportedMethods: ["eth_sendRawTransaction"], creator: () => - new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchSize: 1 }), + new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchLength: 1 }), } } diff --git a/background/services/chain/taho-rpc-provider.ts b/background/services/chain/taho-rpc-provider.ts index 7d3448d47..5212fa52c 100644 --- a/background/services/chain/taho-rpc-provider.ts +++ b/background/services/chain/taho-rpc-provider.ts @@ -2,7 +2,6 @@ import { JsonRpcProvider, Networkish } from "@ethersproject/providers" import { deepCopy } from "@ethersproject/properties" import { ConnectionInfo, fetchJson } from "@ethersproject/web" import logger from "../../lib/logger" -import { wait } from "../../lib/utils" type RPCPayload = { method: string @@ -41,28 +40,23 @@ type RPCOptions = { * @default 10 requests */ maxBatchLength?: number - /** - * Maximum length in bytes of the batch - * @default 1Mb - */ - maxBatchSize?: number - /** - * How long to wait to aggregate requests - * @default 100ms - */ - batchStallTime?: number - /** - * How long to wait between each payload sent - */ - delayBetweenRequests?: number } +/** + * Maximum size in bytes of the batch + * @default 1Mb + */ +const MAX_BATCH_BYTE_SIZE = 1_048_576 // 1mb + +/** + * How long to wait to aggregate requests + * Should be kept at minimum, to keep request time at a normal level + * @default 10ms + */ +const BATCH_STALL_TIME = 10 // 10ms + const defaultOptions = { maxBatchLength: 10, // Seems to work fine for public rpcs - // eslint-disable-next-line no-bitwise - maxBatchSize: 1 << 20, // 1Mb - batchStallTime: 100, - delayBetweenRequests: 500, } const makeSerializableError = ( @@ -80,8 +74,8 @@ const makeSerializableError = ( * * This provider works similarly to ethers `JsonRpcBatchProvider` albeit with a * few differences: It allows configuring maximum batch size and the time window - * to aggregate requests into a batch. Additionally, it also supports throttling - * between requests and can fallback to individual requests if necessary. + * to aggregate requests into a batch. Additionally, it can fallback to individual + * requests if necessary. * * It also features `disconnect`/`reconnect` methods to manage polling and clear * pending requests in specific scenarios (e.g. network errors) @@ -96,12 +90,10 @@ export default class TahoRPCProvider extends JsonRpcProvider { // Tracks whether this provider is currently accepting requests #destroyed = false - #sendTimer: NodeJS.Timer | null = null + #sendTimer: ReturnType | null = null #errorToBatch = new WeakMap() - #sendBatchThrottle = Promise.resolve() - constructor( url?: ConnectionInfo | string, network?: Networkish, @@ -139,7 +131,7 @@ export default class TahoRPCProvider extends JsonRpcProvider { // Enforce max batch size while ( JSON.stringify(batch.map(({ payload }) => payload)).length > - this.#options.maxBatchSize + MAX_BATCH_BYTE_SIZE ) { this.pending.unshift(batch.pop() as RPCPendingRequest) @@ -153,8 +145,6 @@ export default class TahoRPCProvider extends JsonRpcProvider { this.sendNextBatch() } - await this.#sendBatchThrottle - const request = batch.map(({ payload }) => payload) this.emit("debug", { @@ -163,89 +153,81 @@ export default class TahoRPCProvider extends JsonRpcProvider { provider: this, }) - this.#sendBatchThrottle = this.#sendBatchThrottle - .then(() => - fetchJson( - this.connection, - // Some RPCs will reject batch payloads even if they send a single - // request (e.g. flashbots) - JSON.stringify(request.length === 1 ? request[0] : request), - ) - .then((response) => { - const wrappedResponse: RPCResponse[] = Array.isArray(response) - ? response - : [response] - - this.emit("debug", { - action: "response", - request, - response, - provider: this, - }) - - // For cases where a batch is sent and a single error object is returned - // e.g. batch size exceeded - if (batch.length > 1 && !Array.isArray(response)) { - batch.forEach(({ reject }) => { - reject( - trackBatchError( - makeSerializableError( - response?.error?.message ?? "INVALID_RESPONSE", - response?.error?.code, - { response, batch }, - ), - ), - ) - }) - } - - batch.forEach(({ payload: { id }, reject, resolve }) => { - const match = wrappedResponse.find((resp) => id === resp.id) - - if (!match) { - reject( - trackBatchError( - makeSerializableError("bad response", -32000, { - response, - batch, - }), - ), - ) - return - } - - if ("error" in match) { - reject( - trackBatchError( - makeSerializableError( - match.error.message, - match.error.code, - { - response: match, - batch, - }, - ), - ), - ) - } else { - resolve(match.result) - } - }) + fetchJson( + this.connection, + // Some RPCs will reject batch payloads even if they send a single + // request (e.g. flashbots) + JSON.stringify(request.length === 1 ? request[0] : request), + ) + .then((response) => { + const wrappedResponse: RPCResponse[] = Array.isArray(response) + ? response + : [response] + + this.emit("debug", { + action: "response", + request, + response, + provider: this, + }) + + // For cases where a batch is sent and a single error object is returned + // e.g. batch size exceeded + if (batch.length > 1 && !Array.isArray(response)) { + batch.forEach(({ reject }) => { + reject( + trackBatchError( + makeSerializableError( + response?.error?.message ?? "INVALID_RESPONSE", + response?.error?.code, + { response, batch }, + ), + ), + ) }) - .catch((error) => { - this.emit("debug", { - action: "response", - error, - request, - provider: this, - }) - - // Any other error during fetch should propagate - batch.forEach(({ reject }) => reject(trackBatchError(error))) - }), - ) - .then(() => wait(this.#options.delayBetweenRequests)) - }, this.#options.batchStallTime) + } + + batch.forEach(({ payload: { id }, reject, resolve }) => { + const match = wrappedResponse.find((resp) => id === resp.id) + + if (!match) { + reject( + trackBatchError( + makeSerializableError("bad response", -32000, { + response, + batch, + }), + ), + ) + return + } + + if ("error" in match) { + reject( + trackBatchError( + makeSerializableError(match.error.message, match.error.code, { + response: match, + batch, + }), + ), + ) + } else { + resolve(match.result) + } + }) + }) + .catch((error) => { + this.emit("debug", { + action: "response", + error, + request, + provider: this, + }) + + // Any other error during fetch should propagate + batch.forEach(({ reject }) => reject(trackBatchError(error))) + }) + }, BATCH_STALL_TIME) } override send(method: string, params: unknown[]): Promise { From ee2aa3898dfe632bf9f9244e2c4aa42971f6d9bb Mon Sep 17 00:00:00 2001 From: hyphenized <28708889+hyphenized@users.noreply.github.com> Date: Mon, 17 Feb 2025 03:12:51 -0500 Subject: [PATCH 5/5] Add missing ethers ws types --- background/package.json | 1 + yarn.lock | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/background/package.json b/background/package.json index 9742819c0..1634ac1c8 100644 --- a/background/package.json +++ b/background/package.json @@ -73,6 +73,7 @@ "@types/sinon": "^10.0.12", "@types/uuid": "^8.3.4", "@types/webextension-polyfill": "^0.12.0", + "@types/ws": "^8.5.14", "@walletconnect/legacy-types": "^2.0.0", "@walletconnect/types": "^2.7.7", "crypto-browserify": "^3.12.0", diff --git a/yarn.lock b/yarn.lock index 2531596f6..00eae4a1c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4930,6 +4930,13 @@ anymatch "^3.0.0" source-map "^0.6.0" +"@types/ws@^8.5.14": + version "8.5.14" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.14.tgz#93d44b268c9127d96026cf44353725dd9b6c3c21" + integrity sha512-bd/YFLW+URhBzMXurx7lWByOu+xzU9+kb3RboOteXYDfW+tr+JZa99OyNmPINEGB/ahzKrEuc8rcv4gnpJmxTw== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "20.2.1" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-20.2.1.tgz#3b9ce2489919d9e4fea439b76916abc34b2df129"