Skip to content

Commit

Permalink
Remove delay, exposed constants on custom provider
Browse files Browse the repository at this point in the history
Removed delay between requests as it introduces hidden latency and
moved unused exposed options to constants. Batch stall time has been
adjusted accordingly.

Co-authored-by: Antonio Salazar Cardozo <[email protected]>
  • Loading branch information
hyphenized and Shadowfiend committed Feb 17, 2025
1 parent b428120 commit 5f90f2b
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 120 deletions.
22 changes: 11 additions & 11 deletions background/services/chain/serial-fallback-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import TahoAlchemyProvider from "./taho-provider"
import { getErrorType } from "./errors"
import TahoRPCProvider from "./taho-rpc-provider"

type RPCProvider = WebSocketProvider | JsonRpcProvider | TahoRPCProvider
type RPCProvider = WebSocketProvider | TahoAlchemyProvider | TahoRPCProvider

export type ProviderCreator = {
type: "alchemy" | "custom" | "generic"
Expand All @@ -41,10 +41,6 @@ const isWebSocketProvider = (
provider: RPCProvider,
): provider is WebSocketProvider => provider instanceof WebSocketProvider

const isJsonRpcProvider = (
provider: RPCProvider,
): provider is TahoRPCProvider => provider instanceof TahoRPCProvider

/**
* Method list, to describe which rpc method calls on which networks should
* prefer alchemy provider over the generic ones.
Expand Down Expand Up @@ -400,16 +396,20 @@ export default class SerialFallbackProvider extends JsonRpcProvider {

if (
errorType === "batch-limit-exceeded" &&
isJsonRpcProvider(this.currentProvider)
this.currentProvider instanceof TahoRPCProvider
) {
const requestBatch = this.currentProvider.getBatchFromError(error)

const batchLen = requestBatch.length

// Note that every other request in the batch will set the length to
// the same value
if (batchLen <= this.currentProvider.getOptions().maxBatchLength) {
const newMaxBatchLen = Math.max(Math.floor(batchLen / 2), 1)
if (
requestBatch.length <=
this.currentProvider.getOptions().maxBatchLength
) {
const newMaxBatchLen = Math.max(
Math.floor(requestBatch.length / 2),
1,
)

this.currentProvider.setOptions({
maxBatchLength: newMaxBatchLen,
Expand Down Expand Up @@ -1113,7 +1113,7 @@ export function makeFlashbotsProviderCreator(): ProviderCreator {
type: "custom",
supportedMethods: ["eth_sendRawTransaction"],
creator: () =>
new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchSize: 1 }),
new TahoRPCProvider(FLASHBOTS_RPC_URL, undefined, { maxBatchLength: 1 }),
}
}

Expand Down
200 changes: 91 additions & 109 deletions background/services/chain/taho-rpc-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { JsonRpcProvider, Networkish } from "@ethersproject/providers"
import { deepCopy } from "@ethersproject/properties"
import { ConnectionInfo, fetchJson } from "@ethersproject/web"
import logger from "../../lib/logger"
import { wait } from "../../lib/utils"

type RPCPayload = {
method: string
Expand Down Expand Up @@ -41,28 +40,23 @@ type RPCOptions = {
* @default 10 requests
*/
maxBatchLength?: number
/**
* Maximum length in bytes of the batch
* @default 1Mb
*/
maxBatchSize?: number
/**
* How long to wait to aggregate requests
* @default 100ms
*/
batchStallTime?: number
/**
* How long to wait between each payload sent
*/
delayBetweenRequests?: number
}

/**
* Maximum size in bytes of the batch
* @default 1Mb
*/
const MAX_BATCH_BYTE_SIZE = 1_048_576 // 1mb

/**
* How long to wait to aggregate requests
* Should be kept at minimum, to keep request time at a normal level
* @default 10ms
*/
const BATCH_STALL_TIME = 10 // 10ms

const defaultOptions = {
maxBatchLength: 10, // Seems to work fine for public rpcs
// eslint-disable-next-line no-bitwise
maxBatchSize: 1 << 20, // 1Mb
batchStallTime: 100,
delayBetweenRequests: 500,
}

const makeSerializableError = (
Expand All @@ -80,8 +74,8 @@ const makeSerializableError = (
*
* This provider works similarly to ethers `JsonRpcBatchProvider` albeit with a
* few differences: It allows configuring maximum batch size and the time window
* to aggregate requests into a batch. Additionally, it also supports throttling
* between requests and can fallback to individual requests if necessary.
* to aggregate requests into a batch. Additionally, it can fallback to individual
* requests if necessary.
*
* It also features `disconnect`/`reconnect` methods to manage polling and clear
* pending requests in specific scenarios (e.g. network errors)
Expand All @@ -96,12 +90,10 @@ export default class TahoRPCProvider extends JsonRpcProvider {
// Tracks whether this provider is currently accepting requests
#destroyed = false

#sendTimer: NodeJS.Timer | null = null
#sendTimer: ReturnType<typeof setTimeout> | null = null

#errorToBatch = new WeakMap<WeakKey, RPCPendingRequest[]>()

#sendBatchThrottle = Promise.resolve()

constructor(
url?: ConnectionInfo | string,
network?: Networkish,
Expand Down Expand Up @@ -139,7 +131,7 @@ export default class TahoRPCProvider extends JsonRpcProvider {
// Enforce max batch size
while (
JSON.stringify(batch.map(({ payload }) => payload)).length >
this.#options.maxBatchSize
MAX_BATCH_BYTE_SIZE
) {
this.pending.unshift(batch.pop() as RPCPendingRequest)

Expand All @@ -153,8 +145,6 @@ export default class TahoRPCProvider extends JsonRpcProvider {
this.sendNextBatch()
}

await this.#sendBatchThrottle

const request = batch.map(({ payload }) => payload)

this.emit("debug", {
Expand All @@ -163,89 +153,81 @@ export default class TahoRPCProvider extends JsonRpcProvider {
provider: this,
})

this.#sendBatchThrottle = this.#sendBatchThrottle
.then(() =>
fetchJson(
this.connection,
// Some RPCs will reject batch payloads even if they send a single
// request (e.g. flashbots)
JSON.stringify(request.length === 1 ? request[0] : request),
)
.then((response) => {
const wrappedResponse: RPCResponse[] = Array.isArray(response)
? response
: [response]

this.emit("debug", {
action: "response",
request,
response,
provider: this,
})

// For cases where a batch is sent and a single error object is returned
// e.g. batch size exceeded
if (batch.length > 1 && !Array.isArray(response)) {
batch.forEach(({ reject }) => {
reject(
trackBatchError(
makeSerializableError(
response?.error?.message ?? "INVALID_RESPONSE",
response?.error?.code,
{ response, batch },
),
),
)
})
}

batch.forEach(({ payload: { id }, reject, resolve }) => {
const match = wrappedResponse.find((resp) => id === resp.id)

if (!match) {
reject(
trackBatchError(
makeSerializableError("bad response", -32000, {
response,
batch,
}),
),
)
return
}

if ("error" in match) {
reject(
trackBatchError(
makeSerializableError(
match.error.message,
match.error.code,
{
response: match,
batch,
},
),
),
)
} else {
resolve(match.result)
}
})
fetchJson(
this.connection,
// Some RPCs will reject batch payloads even if they send a single
// request (e.g. flashbots)
JSON.stringify(request.length === 1 ? request[0] : request),
)
.then((response) => {
const wrappedResponse: RPCResponse[] = Array.isArray(response)
? response
: [response]

this.emit("debug", {
action: "response",
request,
response,
provider: this,
})

// For cases where a batch is sent and a single error object is returned
// e.g. batch size exceeded
if (batch.length > 1 && !Array.isArray(response)) {
batch.forEach(({ reject }) => {
reject(
trackBatchError(
makeSerializableError(
response?.error?.message ?? "INVALID_RESPONSE",
response?.error?.code,
{ response, batch },
),
),
)
})
.catch((error) => {
this.emit("debug", {
action: "response",
error,
request,
provider: this,
})

// Any other error during fetch should propagate
batch.forEach(({ reject }) => reject(trackBatchError(error)))
}),
)
.then(() => wait(this.#options.delayBetweenRequests))
}, this.#options.batchStallTime)
}

batch.forEach(({ payload: { id }, reject, resolve }) => {
const match = wrappedResponse.find((resp) => id === resp.id)

if (!match) {
reject(
trackBatchError(
makeSerializableError("bad response", -32000, {
response,
batch,
}),
),
)
return
}

if ("error" in match) {
reject(
trackBatchError(
makeSerializableError(match.error.message, match.error.code, {
response: match,
batch,
}),
),
)
} else {
resolve(match.result)
}
})
})
.catch((error) => {
this.emit("debug", {
action: "response",
error,
request,
provider: this,
})

// Any other error during fetch should propagate
batch.forEach(({ reject }) => reject(trackBatchError(error)))
})
}, BATCH_STALL_TIME)
}

override send(method: string, params: unknown[]): Promise<unknown> {
Expand Down

0 comments on commit 5f90f2b

Please sign in to comment.