Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve RPC provider request handling #3774

Merged
merged 8 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions background/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"@types/sinon": "^10.0.12",
"@types/uuid": "^8.3.4",
"@types/webextension-polyfill": "^0.12.0",
"@types/ws": "^8.5.14",
"@walletconnect/legacy-types": "^2.0.0",
"@walletconnect/types": "^2.7.7",
"crypto-browserify": "^3.12.0",
Expand Down
179 changes: 52 additions & 127 deletions background/services/chain/serial-fallback-provider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
EventType,
JsonRpcBatchProvider,
JsonRpcProvider,
Listener,
WebSocketProvider,
Expand Down Expand Up @@ -28,13 +27,18 @@ import { FeatureFlags, isEnabled } from "../../features"
import { RpcConfig } from "./db"
import TahoAlchemyProvider from "./taho-provider"
import { getErrorType } from "./errors"
import TahoRPCProvider from "./taho-rpc-provider"

export type ProviderCreator = {
type: "alchemy" | "custom" | "generic"
supportedMethods?: string[]
creator: () => WebSocketProvider | JsonRpcProvider
creator: () => JsonRpcProvider
}

const isWebSocketProvider = (
provider: JsonRpcProvider,
): provider is WebSocketProvider => provider instanceof WebSocketProvider

/**
* Method list, to describe which rpc method calls on which networks should
* prefer alchemy provider over the generic ones.
Expand Down Expand Up @@ -111,11 +115,8 @@ function backedOffMs(): number {
function isClosedOrClosingWebSocketProvider(
provider: JsonRpcProvider,
): boolean {
if (provider instanceof WebSocketProvider) {
// Digging into the innards of Ethers here because there's no
// other way to get access to the WebSocket connection situation.
// eslint-disable-next-line no-underscore-dangle
const webSocket = provider._websocket as WebSocket
if (isWebSocketProvider(provider)) {
const webSocket = provider.websocket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


return (
webSocket.readyState === WebSocket.CLOSING ||
Expand All @@ -131,11 +132,8 @@ function isClosedOrClosingWebSocketProvider(
* connecting. Ethers does not provide direct access to this information.
*/
function isConnectingWebSocketProvider(provider: JsonRpcProvider): boolean {
if (provider instanceof WebSocketProvider) {
// Digging into the innards of Ethers here because there's no
// other way to get access to the WebSocket connection situation.
// eslint-disable-next-line no-underscore-dangle
const webSocket = provider._websocket as WebSocket
if (isWebSocketProvider(provider)) {
const webSocket = provider.websocket
return webSocket.readyState === WebSocket.CONNECTING
}

Expand Down Expand Up @@ -195,10 +193,7 @@ function customOrDefaultProvider(
export default class SerialFallbackProvider extends JsonRpcProvider {
// Functions that will create and initialize a new provider, in priority
// order.
private providerCreators: [
() => WebSocketProvider | JsonRpcProvider,
...(() => JsonRpcProvider)[],
]
private providerCreators: (() => JsonRpcProvider)[]

// The currently-used provider, produced by the provider-creator at
// currentProviderIndex.
Expand Down Expand Up @@ -227,9 +222,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
}
} = {}

private alchemyProviderCreator:
| (() => WebSocketProvider | JsonRpcProvider)
| undefined
private alchemyProviderCreator: (() => JsonRpcProvider) | undefined

supportsAlchemy = false

Expand All @@ -247,21 +240,6 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
// for reconnects when relevant.
private currentProviderIndex = 0

// If nonzero and the underlying provider is a batch provider, forces the
// batch size to be no more than this number, holding other requests until
// the existing batch has cleared.
private forcedBatchMaxSize: number = 0

// If this promise is set, new RPC calls will await on it before being
// processed. When forcedBatchMaxSize is nonzero and that number of RPC calls
// are pending, this promise will be set so subsequent requests will wait
// until the batch flushes.
private forcedBatchMaxPromise: Promise<void> | undefined = undefined

// During max size update, this value is set so that the value is not
// decreased by multiple failed requests.
private forcedBatchMaxPreviousSize: number = 0

// TEMPORARY cache for latest account balances to reduce number of rpc calls
// This is intended as a temporary fix to the burst of account enrichment that
// happens when the extension is first loaded up as a result of activity emission
Expand All @@ -281,7 +259,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
// reloaded and the property of having code updates quite rarely.
private latestHasCodeCache: {
[address: string]: {
hasCode: boolean
hasCode: string
}
} = {}

Expand Down Expand Up @@ -372,46 +350,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
delete this.messagesToSend[messageId]
return cachedResult
}

if (this.forcedBatchMaxPromise) {
await this.forcedBatchMaxPromise
}

const pendingBatch =
"_pendingBatch" in this.currentProvider
? // Accessing ethers internals for forced max batch sizing.
// eslint-disable-next-line no-underscore-dangle
(this.currentProvider._pendingBatch as { length: number } | undefined)
: undefined
const pendingBatchSize = pendingBatch?.length
const existingProviderIndex = this.currentProviderIndex

if (
pendingBatch &&
this.forcedBatchMaxSize &&
// Accessing ethers internals for forced max batch sizing.
// eslint-disable-next-line no-underscore-dangle
pendingBatch.length >= this.forcedBatchMaxSize
) {
this.forcedBatchMaxPromise = new Promise((resolve) => {
const checkInterval = setInterval(() => {
const latestPendingBatch =
"_pendingBatch" in this.currentProvider
? // Accessing ethers internals for forced max batch sizing.
// eslint-disable-next-line no-underscore-dangle
(this.currentProvider._pendingBatch as
| { length: number }
| undefined)
: undefined

if ((latestPendingBatch?.length ?? 0) < this.forcedBatchMaxSize) {
resolve()
clearInterval(checkInterval)
}
}, 5)
})
}

try {
if (isClosedOrClosingWebSocketProvider(this.currentProvider)) {
// Detect disconnected WebSocket and immediately throw.
Expand Down Expand Up @@ -451,44 +390,41 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
return result
} catch (error) {
// Awful, but what can ya do.

const stringifiedError = String(error)

const errorType = getErrorType(stringifiedError, method)

if (
errorType === "batch-limit-exceeded" &&
(pendingBatchSize === undefined || pendingBatchSize === 0)
this.currentProvider instanceof TahoRPCProvider
) {
this.forcedBatchMaxPreviousSize =
pendingBatch?.length ?? pendingBatchSize ?? 1
this.forcedBatchMaxSize =
(pendingBatch?.length ?? pendingBatchSize ?? 2) / 2
const requestBatch = this.currentProvider.getBatchFromError(error)

logger.debug(
"Setting a max batch size of",
this.forcedBatchMaxSize,
"on chain",
this.chainID,
"and retrying: ",
method,
params,
)
// Note that every other request in the batch will set the length to
// the same value
if (
requestBatch.length <=
this.currentProvider.getOptions().maxBatchLength
) {
const newMaxBatchLen = Math.max(
Math.floor(requestBatch.length / 2),
1,
)

return this.routeRpcCall(messageId)
}
this.currentProvider.setOptions({
maxBatchLength: newMaxBatchLen,
})

if (errorType === "batch-limit-exceeded") {
logger.debug(
"Using max batch size of",
this.forcedBatchMaxSize,
"on chain",
this.chainID,
"and retrying: ",
method,
params,
)

return this.routeRpcCall(messageId)
logger.debug(
"Setting a max batch size of",
newMaxBatchLen,
"for rpc",
this.currentProvider.connection.url,
)
}
// Retry with a new limit on batch length
return waitAnd(500, () => this.routeRpcCall(messageId))
}

if (
Expand Down Expand Up @@ -617,7 +553,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
if (method === "eth_getCode" && (params as string[])[1] === "latest") {
const address = (params as string[])[0]
this.latestHasCodeCache[address] = {
hasCode: result as boolean,
hasCode: result as string,
}
}
}
Expand Down Expand Up @@ -756,7 +692,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
): Promise<void> {
const subscription = { tag, param, processFunc }

if (this.currentProvider instanceof WebSocketProvider) {
if (isWebSocketProvider(this.currentProvider)) {
// eslint-disable-next-line no-underscore-dangle
await this.currentProvider._subscribe(tag, param, processFunc)
this.subscriptions.push(subscription)
Expand Down Expand Up @@ -827,7 +763,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
}

/**
* Behaves the same as the `JsonRpcProvider` `on` method, but also trakcs the
* Behaves the same as the `JsonRpcProvider` `on` method, but also tracks the
* event subscription so that an underlying provider failure will not prevent
* it from firing.
*/
Expand All @@ -844,7 +780,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
}

/**
* Behaves the same as the `JsonRpcProvider` `once` method, but also trakcs
* Behaves the same as the `JsonRpcProvider` `once` method, but also tracks
* the event subscription so that an underlying provider failure will not
* prevent it from firing.
*/
Expand Down Expand Up @@ -911,12 +847,12 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
private disconnectCurrentProvider() {
logger.debug(
"Disconnecting current provider; websocket: ",
this.currentProvider instanceof WebSocketProvider,
isWebSocketProvider(this.currentProvider),
"on chain",
this.chainID,
".",
)
if (this.currentProvider instanceof WebSocketProvider) {
if (isWebSocketProvider(this.currentProvider)) {
this.currentProvider.destroy()
} else {
// For non-WebSocket providers, kill all subscriptions so the listeners
Expand Down Expand Up @@ -1009,9 +945,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
return false
}

if (provider instanceof WebSocketProvider) {
const websocketProvider = provider as WebSocketProvider

if (isWebSocketProvider(provider)) {
// Chain promises to serially resubscribe.
//
// TODO If anything fails along the way, it should yield the same kind of
Expand All @@ -1023,7 +957,7 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
// Direct subscriptions are internal, but we want to be able to
// restore them.
// eslint-disable-next-line no-underscore-dangle
websocketProvider._subscribe(tag, param, processFunc),
provider._subscribe(tag, param, processFunc),
),
),
Promise.resolve(),
Expand Down Expand Up @@ -1162,23 +1096,13 @@ export default class SerialFallbackProvider extends JsonRpcProvider {
}
}

function getProviderCreator(
rpcUrl: string,
): JsonRpcProvider | WebSocketProvider {
function getProviderCreator(rpcUrl: string): JsonRpcProvider {
const url = new URL(rpcUrl)
if (/^wss?/.test(url.protocol)) {
return new WebSocketProvider(rpcUrl)
}

if (/rpc\.ankr\.com|1rpc\.io|polygon-rpc\.com/.test(url.href)) {
return new JsonRpcBatchProvider({
url: rpcUrl,
throttleLimit: 1,
timeout: PROVIDER_REQUEST_TIMEOUT,
})
}

return new JsonRpcProvider({
return new TahoRPCProvider({
url: rpcUrl,
throttleLimit: 1,
timeout: PROVIDER_REQUEST_TIMEOUT,
Expand All @@ -1189,7 +1113,8 @@ export function makeFlashbotsProviderCreator(): ProviderCreator {
return {
type: "custom",
supportedMethods: ["eth_sendRawTransaction"],
creator: () => getProviderCreator(FLASHBOTS_RPC_URL),
creator: () =>
new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchLength: 1 }),
}
}

Expand All @@ -1202,7 +1127,7 @@ export function makeSerialFallbackProvider(
return new SerialFallbackProvider(FORK.chainID, [
{
type: "generic" as const,
creator: () => new JsonRpcProvider(process.env.MAINNET_FORK_URL),
creator: () => new TahoRPCProvider(process.env.MAINNET_FORK_URL),
},
])
}
Expand All @@ -1221,7 +1146,7 @@ export function makeSerialFallbackProvider(
return new SerialFallbackProvider(ARBITRUM_SEPOLIA.chainID, [
{
type: "generic" as const,
creator: () => new JsonRpcBatchProvider(process.env.ARBITRUM_FORK_RPC),
creator: () => new TahoRPCProvider(process.env.ARBITRUM_FORK_RPC),
},
])
}
Expand Down
Loading