From ae8c8803e20c7bdcca288a157b38d657edfcb69d Mon Sep 17 00:00:00 2001 From: "Rob Moore (MakerX)" Date: Thu, 21 Mar 2024 17:18:26 +0800 Subject: [PATCH] feat: Added ability to restrict the number of rounds indexer catchup syncs for so large catchups are more reliable --- docs/README.md | 4 +- docs/code/classes/index.AlgorandSubscriber.md | 30 +++++----- ...s_subscription.AlgorandSubscriberConfig.md | 36 ++++++++++-- ...ption.CoreTransactionSubscriptionParams.md | 24 +++++++- ...pes_subscription.NamedTransactionFilter.md | 4 +- ...pes_subscription.SubscriberConfigFilter.md | 6 +- .../types_subscription.TransactionFilter.md | 30 +++++----- ...scription.TransactionSubscriptionParams.md | 30 +++++++++- docs/code/modules/index.md | 2 +- docs/code/modules/types_subscription.md | 2 +- docs/subscriber.md | 11 ++++ docs/subscriptions.md | 11 ++++ src/subscriber.ts | 23 ++++---- src/subscriptions.ts | 55 ++++++++++++------- src/types/subscription.ts | 13 ++++- tests/scenarios/catchup-with-indexer.spec.ts | 39 ++++++++++++- tests/scenarios/subscriber.spec.ts | 6 +- tests/transactions.ts | 5 +- 18 files changed, 240 insertions(+), 91 deletions(-) diff --git a/docs/README.md b/docs/README.md index 9c4e023..db3fef2 100644 --- a/docs/README.md +++ b/docs/README.md @@ -312,9 +312,9 @@ Any [filter](#extensive-subscription-filtering) you apply will be seamlessly tra To see this in action, you can run the Data History Museum example in this repository against MainNet and see it sync millions of rounds in seconds. -The indexer catchup isn't magic - if the filter you are trying to catch up with generates an enormous number of transactions (e.g. hundreds of thousands or millions) then it will run very slowly and has the potential for running out of compute and memory time depending on what the constraints are in the deployment environment you are running in. To understand how the indexer behaviour works to know if you are likely to generate a lot of transactions it's worth understanding the architecture of the indexer catchup. +The indexer catchup isn't magic - if the filter you are trying to catch up with generates an enormous number of transactions (e.g. hundreds of thousands or millions) then it will run very slowly and has the potential for running out of compute and memory time depending on what the constraints are in the deployment environment you are running in. In that instance though, there is a config parameter you can use `maxIndexerRoundsToSync` so you can break the indexer catchup into multiple "polls" e.g. 100,000 rounds at a time. This allows a smaller batch of transactions to be retrieved and persisted in multiple batches. -Indexer catchup runs in two stages: +To understand how the indexer behaviour works to know if you are likely to generate a lot of transactions it's worth understanding the architecture of the indexer catchup; indexer catchup runs in two stages: 1. **Pre-filtering**: Any filters that can be translated to the [indexer search transactions endpoint](https://developer.algorand.org/docs/rest-apis/indexer/#get-v2transactions). This query is then run between the rounds that need to be synced and paginated in the max number of results (1000) at a time until all of the transactions are retrieved. This ensures we get round-based transactional consistency. This is the filter that can easily explode out though and take a long time when using indexer catchup. For avoidance of doubt, the following filters are the ones that are converted to a pre-filter: - `sender` diff --git a/docs/code/classes/index.AlgorandSubscriber.md b/docs/code/classes/index.AlgorandSubscriber.md index 0310007..68504a8 100644 --- a/docs/code/classes/index.AlgorandSubscriber.md +++ b/docs/code/classes/index.AlgorandSubscriber.md @@ -16,12 +16,12 @@ Handles the logic for subscribing to the Algorand blockchain and emitting events - [abortController](index.AlgorandSubscriber.md#abortcontroller) - [algod](index.AlgorandSubscriber.md#algod) +- [config](index.AlgorandSubscriber.md#config) - [eventEmitter](index.AlgorandSubscriber.md#eventemitter) - [filterNames](index.AlgorandSubscriber.md#filternames) - [indexer](index.AlgorandSubscriber.md#indexer) - [startPromise](index.AlgorandSubscriber.md#startpromise) - [started](index.AlgorandSubscriber.md#started) -- [subscription](index.AlgorandSubscriber.md#subscription) ### Methods @@ -77,6 +77,16 @@ ___ ___ +### config + +• `Private` **config**: [`AlgorandSubscriberConfig`](../interfaces/types_subscription.AlgorandSubscriberConfig.md) + +#### Defined in + +[subscriber.ts:21](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L21) + +___ + ### eventEmitter • `Private` **eventEmitter**: [`AsyncEventEmitter`](types_async_event_emitter.AsyncEventEmitter.md) @@ -125,16 +135,6 @@ ___ [subscriber.ts:24](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L24) -___ - -### subscription - -• `Private` **subscription**: [`AlgorandSubscriberConfig`](../interfaces/types_subscription.AlgorandSubscriberConfig.md) - -#### Defined in - -[subscriber.ts:21](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L21) - ## Methods ### on @@ -166,7 +166,7 @@ The subscriber so `on`/`onBatch` calls can be chained #### Defined in -[subscriber.ts:162](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L162) +[subscriber.ts:159](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L159) ___ @@ -203,7 +203,7 @@ The subscriber so `on`/`onBatch` calls can be chained #### Defined in -[subscriber.ts:179](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L179) +[subscriber.ts:176](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L176) ___ @@ -251,7 +251,7 @@ An object that contains a promise you can wait for after calling stop #### Defined in -[subscriber.ts:101](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L101) +[subscriber.ts:98](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L98) ___ @@ -275,4 +275,4 @@ A promise that can be awaited to ensure the subscriber has finished stopping #### Defined in -[subscriber.ts:144](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L144) +[subscriber.ts:141](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L141) diff --git a/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md b/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md index 28cdfaf..e75d414 100644 --- a/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md +++ b/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md @@ -19,6 +19,7 @@ Configuration for an `AlgorandSubscriber`. - [arc28Events](types_subscription.AlgorandSubscriberConfig.md#arc28events) - [filters](types_subscription.AlgorandSubscriberConfig.md#filters) - [frequencyInSeconds](types_subscription.AlgorandSubscriberConfig.md#frequencyinseconds) +- [maxIndexerRoundsToSync](types_subscription.AlgorandSubscriberConfig.md#maxindexerroundstosync) - [maxRoundsToSync](types_subscription.AlgorandSubscriberConfig.md#maxroundstosync) - [syncBehaviour](types_subscription.AlgorandSubscriberConfig.md#syncbehaviour) - [waitForBlockWhenAtTip](types_subscription.AlgorandSubscriberConfig.md#waitforblockwhenattip) @@ -54,7 +55,7 @@ The set of filters to subscribe to / emit events for, along with optional data m #### Defined in -[types/subscription.ts:164](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L164) +[types/subscription.ts:175](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L175) ___ @@ -66,7 +67,30 @@ The frequency to poll for new blocks in seconds; defaults to 1s #### Defined in -[types/subscription.ts:166](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L166) +[types/subscription.ts:177](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L177) + +___ + +### maxIndexerRoundsToSync + +• `Optional` **maxIndexerRoundsToSync**: `number` + +The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + +By default there is no limit and it will paginate through all of the rounds. +Sometimes this can result in an incredibly long catchup time that may break the service +due to execution and memory constraints, particularly for filters that result in a large number of transactions. + +Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent +boundary based on the number of rounds specified here. + +#### Inherited from + +[CoreTransactionSubscriptionParams](types_subscription.CoreTransactionSubscriptionParams.md).[maxIndexerRoundsToSync](types_subscription.CoreTransactionSubscriptionParams.md#maxindexerroundstosync) + +#### Defined in + +[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) ___ @@ -74,7 +98,7 @@ ___ • `Optional` **maxRoundsToSync**: `number` -The maximum number of rounds to sync for each subscription pull/poll. +The maximum number of rounds to sync from algod for each subscription pull/poll. Defaults to 500. @@ -119,7 +143,7 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:96](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L96) +[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) ___ @@ -131,7 +155,7 @@ Whether to wait via algod `/status/wait-for-block-after` endpoint when at the ti #### Defined in -[types/subscription.ts:168](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L168) +[types/subscription.ts:179](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L179) ___ @@ -151,4 +175,4 @@ its position in the chain #### Defined in -[types/subscription.ts:171](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L171) +[types/subscription.ts:182](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L182) diff --git a/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md b/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md index ab819ff..6a9a46a 100644 --- a/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md +++ b/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md @@ -20,6 +20,7 @@ Common parameters to control a single subscription pull/poll for both `AlgorandS - [arc28Events](types_subscription.CoreTransactionSubscriptionParams.md#arc28events) - [filters](types_subscription.CoreTransactionSubscriptionParams.md#filters) +- [maxIndexerRoundsToSync](types_subscription.CoreTransactionSubscriptionParams.md#maxindexerroundstosync) - [maxRoundsToSync](types_subscription.CoreTransactionSubscriptionParams.md#maxroundstosync) - [syncBehaviour](types_subscription.CoreTransactionSubscriptionParams.md#syncbehaviour) @@ -68,11 +69,30 @@ A list of filters with corresponding names. ___ +### maxIndexerRoundsToSync + +• `Optional` **maxIndexerRoundsToSync**: `number` + +The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + +By default there is no limit and it will paginate through all of the rounds. +Sometimes this can result in an incredibly long catchup time that may break the service +due to execution and memory constraints, particularly for filters that result in a large number of transactions. + +Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent +boundary based on the number of rounds specified here. + +#### Defined in + +[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) + +___ + ### maxRoundsToSync • `Optional` **maxRoundsToSync**: `number` -The maximum number of rounds to sync for each subscription pull/poll. +The maximum number of rounds to sync from algod for each subscription pull/poll. Defaults to 500. @@ -109,4 +129,4 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:96](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L96) +[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) diff --git a/docs/code/interfaces/types_subscription.NamedTransactionFilter.md b/docs/code/interfaces/types_subscription.NamedTransactionFilter.md index d883df8..07ab5de 100644 --- a/docs/code/interfaces/types_subscription.NamedTransactionFilter.md +++ b/docs/code/interfaces/types_subscription.NamedTransactionFilter.md @@ -29,7 +29,7 @@ The filter itself. #### Defined in -[types/subscription.ts:104](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L104) +[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) ___ @@ -41,4 +41,4 @@ The name to give the filter. #### Defined in -[types/subscription.ts:102](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L102) +[types/subscription.ts:113](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L113) diff --git a/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md b/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md index 6f21026..b28c2d6 100644 --- a/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md +++ b/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md @@ -40,7 +40,7 @@ The filter itself. #### Defined in -[types/subscription.ts:104](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L104) +[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) ___ @@ -70,7 +70,7 @@ Note: if you provide multiple filters with the same name then only the mapper of #### Defined in -[types/subscription.ts:187](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L187) +[types/subscription.ts:198](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L198) ___ @@ -86,4 +86,4 @@ The name to give the filter. #### Defined in -[types/subscription.ts:102](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L102) +[types/subscription.ts:113](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L113) diff --git a/docs/code/interfaces/types_subscription.TransactionFilter.md b/docs/code/interfaces/types_subscription.TransactionFilter.md index 25738a8..8234794 100644 --- a/docs/code/interfaces/types_subscription.TransactionFilter.md +++ b/docs/code/interfaces/types_subscription.TransactionFilter.md @@ -50,7 +50,7 @@ Filter to app transactions that meet the given app arguments predicate. #### Defined in -[types/subscription.ts:139](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L139) +[types/subscription.ts:150](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L150) ___ @@ -62,7 +62,7 @@ Filter to transactions that are creating an app. #### Defined in -[types/subscription.ts:120](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L120) +[types/subscription.ts:131](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L131) ___ @@ -74,7 +74,7 @@ Filter to transactions against the app with the given ID. #### Defined in -[types/subscription.ts:118](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L118) +[types/subscription.ts:129](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L129) ___ @@ -86,7 +86,7 @@ Filter to transactions that have given on complete(s). #### Defined in -[types/subscription.ts:122](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L122) +[types/subscription.ts:133](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L133) ___ @@ -99,7 +99,7 @@ Note: the definitions for these events must be passed in to the subscription con #### Defined in -[types/subscription.ts:143](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L143) +[types/subscription.ts:154](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L154) ___ @@ -111,7 +111,7 @@ Filter to transactions that are creating an asset. #### Defined in -[types/subscription.ts:126](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L126) +[types/subscription.ts:137](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L137) ___ @@ -123,7 +123,7 @@ Filter to transactions against the asset with the given ID. #### Defined in -[types/subscription.ts:124](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L124) +[types/subscription.ts:135](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L135) ___ @@ -136,7 +136,7 @@ or equal to the given maximum (microAlgos or decimal units of an ASA if type: ax #### Defined in -[types/subscription.ts:132](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L132) +[types/subscription.ts:143](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L143) ___ @@ -149,7 +149,7 @@ the given method signature as the first app argument. #### Defined in -[types/subscription.ts:135](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L135) +[types/subscription.ts:146](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L146) ___ @@ -161,7 +161,7 @@ Filter to app transactions that match one of the given ARC-0004 method selectors #### Defined in -[types/subscription.ts:137](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L137) +[types/subscription.ts:148](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L148) ___ @@ -174,7 +174,7 @@ than or equal to the given minimum (microAlgos or decimal units of an ASA if typ #### Defined in -[types/subscription.ts:129](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L129) +[types/subscription.ts:140](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L140) ___ @@ -186,7 +186,7 @@ Filter to transactions with a note having the given prefix. #### Defined in -[types/subscription.ts:116](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L116) +[types/subscription.ts:127](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L127) ___ @@ -198,7 +198,7 @@ Filter to transactions being received by the specified address. #### Defined in -[types/subscription.ts:114](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L114) +[types/subscription.ts:125](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L125) ___ @@ -210,7 +210,7 @@ Filter to transactions sent from the specified address. #### Defined in -[types/subscription.ts:112](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L112) +[types/subscription.ts:123](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L123) ___ @@ -222,4 +222,4 @@ Filter based on the given transaction type. #### Defined in -[types/subscription.ts:110](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L110) +[types/subscription.ts:121](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L121) diff --git a/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md b/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md index 3bc68b7..c932c84 100644 --- a/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md +++ b/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md @@ -18,6 +18,7 @@ Parameters to control a single subscription pull/poll. - [arc28Events](types_subscription.TransactionSubscriptionParams.md#arc28events) - [filters](types_subscription.TransactionSubscriptionParams.md#filters) +- [maxIndexerRoundsToSync](types_subscription.TransactionSubscriptionParams.md#maxindexerroundstosync) - [maxRoundsToSync](types_subscription.TransactionSubscriptionParams.md#maxroundstosync) - [syncBehaviour](types_subscription.TransactionSubscriptionParams.md#syncbehaviour) - [watermark](types_subscription.TransactionSubscriptionParams.md#watermark) @@ -75,11 +76,34 @@ A list of filters with corresponding names. ___ +### maxIndexerRoundsToSync + +• `Optional` **maxIndexerRoundsToSync**: `number` + +The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + +By default there is no limit and it will paginate through all of the rounds. +Sometimes this can result in an incredibly long catchup time that may break the service +due to execution and memory constraints, particularly for filters that result in a large number of transactions. + +Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent +boundary based on the number of rounds specified here. + +#### Inherited from + +[CoreTransactionSubscriptionParams](types_subscription.CoreTransactionSubscriptionParams.md).[maxIndexerRoundsToSync](types_subscription.CoreTransactionSubscriptionParams.md#maxindexerroundstosync) + +#### Defined in + +[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) + +___ + ### maxRoundsToSync • `Optional` **maxRoundsToSync**: `number` -The maximum number of rounds to sync for each subscription pull/poll. +The maximum number of rounds to sync from algod for each subscription pull/poll. Defaults to 500. @@ -124,7 +148,7 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:96](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L96) +[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) ___ @@ -144,4 +168,4 @@ will be slow if `onMaxRounds` is `sync-oldest`. #### Defined in -[types/subscription.ts:158](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L158) +[types/subscription.ts:169](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L169) diff --git a/docs/code/modules/index.md b/docs/code/modules/index.md index 6b0090b..2622630 100644 --- a/docs/code/modules/index.md +++ b/docs/code/modules/index.md @@ -38,7 +38,7 @@ The blocks #### Defined in -[subscriptions.ts:531](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriptions.ts#L531) +[subscriptions.ts:546](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriptions.ts#L546) ___ diff --git a/docs/code/modules/types_subscription.md b/docs/code/modules/types_subscription.md index 59a1ca1..56e40bc 100644 --- a/docs/code/modules/types_subscription.md +++ b/docs/code/modules/types_subscription.md @@ -65,4 +65,4 @@ ___ #### Defined in -[types/subscription.ts:190](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L190) +[types/subscription.ts:201](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L201) diff --git a/docs/subscriber.md b/docs/subscriber.md index 1efec1a..27a2f85 100644 --- a/docs/subscriber.md +++ b/docs/subscriber.md @@ -27,6 +27,17 @@ export interface AlgorandSubscriberConfig { waitForBlockWhenAtTip?: boolean /** The maximum number of rounds to sync at a time; defaults to 500 */ maxRoundsToSync?: number + /** + * The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + * + * By default there is no limit and it will paginate through all of the rounds. + * Sometimes this can result in an incredibly long catchup time that may break the service + * due to execution and memory constraints, particularly for filters that result in a large number of transactions. + * + * Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent + * boundary based on the number of rounds specified here. + */ + maxIndexerRoundsToSync?: number /** The set of filters to subscribe to / emit events for, along with optional data mappers */ filters: SubscriberConfigFilter[] /** Any ARC-28 event definitions to process from app call logs */ diff --git a/docs/subscriptions.md b/docs/subscriptions.md index b555f11..c7e9fba 100644 --- a/docs/subscriptions.md +++ b/docs/subscriptions.md @@ -73,6 +73,17 @@ export interface TransactionSubscriptionParams { * your catchup speed when using `sync-oldest`. **/ maxRoundsToSync?: number + /** + * The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + * + * By default there is no limit and it will paginate through all of the rounds. + * Sometimes this can result in an incredibly long catchup time that may break the service + * due to execution and memory constraints, particularly for filters that result in a large number of transactions. + * + * Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent + * boundary based on the number of rounds specified here. + */ + maxIndexerRoundsToSync?: number /** If the current tip of the configured Algorand blockchain is more than `maxRoundsToSync` * past `watermark` then how should that be handled: * * `skip-sync-newest`: Discard old blocks/transactions and sync the newest; useful diff --git a/src/subscriber.ts b/src/subscriber.ts index dd0e8ac..9f4fb87 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -18,7 +18,7 @@ import Indexer = algosdk.Indexer export class AlgorandSubscriber { private algod: Algodv2 private indexer: Indexer | undefined - private subscription: AlgorandSubscriberConfig + private config: AlgorandSubscriberConfig private abortController: AbortController private eventEmitter: AsyncEventEmitter private started: boolean = false @@ -34,11 +34,11 @@ export class AlgorandSubscriber { constructor(config: AlgorandSubscriberConfig, algod: Algodv2, indexer?: Indexer) { this.algod = algod this.indexer = indexer - this.subscription = config + this.config = config this.abortController = new AbortController() this.eventEmitter = new AsyncEventEmitter() - this.filterNames = this.subscription.filters + this.filterNames = this.config.filters .map((f) => f.name) .filter((value, index, self) => { // Remove duplicates @@ -58,15 +58,12 @@ export class AlgorandSubscriber { * @returns The poll result */ async pollOnce(): Promise { - const watermark = await this.subscription.watermarkPersistence.get() + const watermark = await this.config.watermarkPersistence.get() const pollResult = await getSubscribedTransactions( { - filters: this.subscription.filters, - arc28Events: this.subscription.arc28Events, watermark, - maxRoundsToSync: this.subscription.maxRoundsToSync ?? 500, - syncBehaviour: this.subscription.syncBehaviour, + ...this.config, }, this.algod, this.indexer, @@ -74,7 +71,7 @@ export class AlgorandSubscriber { try { for (const filterName of this.filterNames) { - const mapper = this.subscription.filters.find((f) => f.name === filterName)?.mapper + const mapper = this.config.filters.find((f) => f.name === filterName)?.mapper const matchedTransactions = pollResult.subscribedTransactions.filter((t) => t.filtersMatched?.includes(filterName)) const mappedTransactions = mapper ? await mapper(matchedTransactions) : matchedTransactions @@ -87,7 +84,7 @@ export class AlgorandSubscriber { algokit.Config.logger.error(`Error processing event emittance`, e) throw e } - await this.subscription.watermarkPersistence.set(pollResult.newWatermark) + await this.config.watermarkPersistence.set(pollResult.newWatermark) return pollResult } @@ -115,11 +112,11 @@ export class AlgorandSubscriber { subscribedTransactionsLength: result.subscribedTransactions.length, }) // eslint-disable-next-line no-console - if (result.currentRound > result.newWatermark || !this.subscription.waitForBlockWhenAtTip) { + if (result.currentRound > result.newWatermark || !this.config.waitForBlockWhenAtTip) { algokit.Config.getLogger(suppressLog).info( - `Subscription poll completed in ${durationInSeconds}s; sleeping for ${this.subscription.frequencyInSeconds ?? 1}s`, + `Subscription poll completed in ${durationInSeconds}s; sleeping for ${this.config.frequencyInSeconds ?? 1}s`, ) - await sleep((this.subscription.frequencyInSeconds ?? 1) * 1000, this.abortController.signal) + await sleep((this.config.frequencyInSeconds ?? 1) * 1000, this.abortController.signal) } else { // Wait until the next block is published algokit.Config.getLogger(suppressLog).info( diff --git a/src/subscriptions.ts b/src/subscriptions.ts index d795272..00f2eb4 100644 --- a/src/subscriptions.ts +++ b/src/subscriptions.ts @@ -85,11 +85,13 @@ export async function getSubscribedTransactions( } } + let indexerSyncToRoundNumber = 0 let algodSyncFromRoundNumber = watermark + 1 let startRound = algodSyncFromRoundNumber let endRound = currentRound let catchupTransactions: SubscribedTransaction[] = [] let start = +new Date() + let skipAlgodSync = false // If we are less than `maxRoundstoSync` from the tip of the chain then we consult the `syncBehaviour` to determine what to do if (currentRound - watermark > maxRoundsToSync) { @@ -118,10 +120,18 @@ export async function getSubscribedTransactions( throw new Error("Can't catch up using indexer since it's not provided") } - algodSyncFromRoundNumber = currentRound - maxRoundsToSync + 1 + // If we have more than `maxIndexerRoundsToSync` rounds to sync from indexer then we skip algod sync and just sync that many rounds from indexer + indexerSyncToRoundNumber = currentRound - maxRoundsToSync + if (subscription.maxIndexerRoundsToSync && indexerSyncToRoundNumber - startRound + 1 > subscription.maxIndexerRoundsToSync) { + indexerSyncToRoundNumber = startRound + subscription.maxIndexerRoundsToSync - 1 + endRound = indexerSyncToRoundNumber + skipAlgodSync = true + } else { + algodSyncFromRoundNumber = indexerSyncToRoundNumber + 1 + } algokit.Config.logger.debug( - `Catching up from round ${startRound} to round ${algodSyncFromRoundNumber - 1} via indexer; this may take a few seconds`, + `Catching up from round ${startRound} to round ${indexerSyncToRoundNumber} via indexer; this may take a few seconds`, ) // Retrieve and process transactions from indexer in groups of 30 so we don't get rate limited @@ -132,9 +142,7 @@ export async function getSubscribedTransactions( // For each filter chunkedFilters.map(async (f) => // Retrieve all pre-filtered transactions from the indexer - ( - await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, algodSyncFromRoundNumber - 1)) - ).transactions + (await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, indexerSyncToRoundNumber))).transactions // Re-run the pre-filter in-memory so we properly extract inner transactions .flatMap((t) => getFilteredIndexerTransactions(t, f)) // Run the post-filter so we get the final list of matching transactions @@ -166,22 +174,29 @@ export async function getSubscribedTransactions( } // Retrieve and process blocks from algod - start = +new Date() - const blocks = await getBlocksBulk({ startRound: algodSyncFromRoundNumber, maxRound: endRound }, algod) - const blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) - const algodTransactions = filters - .flatMap((f) => - blockTransactions - .filter((t) => transactionFilter(f.filter, arc28Events, subscription.arc28Events ?? [])(t!)) - .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), - ) - .reduce(deduplicateSubscribedTransactionsReducer, []) + let algodTransactions: SubscribedTransaction[] = [] + if (!skipAlgodSync) { + start = +new Date() + const blocks = await getBlocksBulk({ startRound: algodSyncFromRoundNumber, maxRound: endRound }, algod) + const blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) + algodTransactions = filters + .flatMap((f) => + blockTransactions + .filter((t) => transactionFilter(f.filter, arc28Events, subscription.arc28Events ?? [])(t!)) + .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), + ) + .reduce(deduplicateSubscribedTransactionsReducer, []) - algokit.Config.logger.debug( - `Retrieved ${blockTransactions.length} transactions from algod via round(s) ${algodSyncFromRoundNumber}-${endRound} in ${ - (+new Date() - start) / 1000 - }s`, - ) + algokit.Config.logger.debug( + `Retrieved ${blockTransactions.length} transactions from algod via round(s) ${algodSyncFromRoundNumber}-${endRound} in ${ + (+new Date() - start) / 1000 + }s`, + ) + } else { + algokit.Config.logger.debug( + `Skipping algod sync since we have more than ${subscription.maxIndexerRoundsToSync} rounds to sync from indexer.`, + ) + } return { syncedRoundRange: [startRound, endRound], diff --git a/src/types/subscription.ts b/src/types/subscription.ts index 6cb121e..55d8433 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -67,7 +67,7 @@ export interface CoreTransactionSubscriptionParams { filters: NamedTransactionFilter[] /** Any ARC-28 event definitions to process from app call logs */ arc28Events?: Arc28EventGroup[] - /** The maximum number of rounds to sync for each subscription pull/poll. + /** The maximum number of rounds to sync from algod for each subscription pull/poll. * * Defaults to 500. * @@ -76,6 +76,17 @@ export interface CoreTransactionSubscriptionParams { * your catchup speed when using `sync-oldest`. **/ maxRoundsToSync?: number + /** + * The maximum number of rounds to sync from indexer when using `syncBehaviour: 'catchup-with-indexer'. + * + * By default there is no limit and it will paginate through all of the rounds. + * Sometimes this can result in an incredibly long catchup time that may break the service + * due to execution and memory constraints, particularly for filters that result in a large number of transactions. + * + * Instead, this allows indexer catchup to be split into multiple polls, each with a transactionally consistent + * boundary based on the number of rounds specified here. + */ + maxIndexerRoundsToSync?: number /** If the current tip of the configured Algorand blockchain is more than `maxRoundsToSync` * past `watermark` then how should that be handled: * * `skip-sync-newest`: Discard old blocks/transactions and sync the newest; useful diff --git a/tests/scenarios/catchup-with-indexer.spec.ts b/tests/scenarios/catchup-with-indexer.spec.ts index b38abda..30d0090 100644 --- a/tests/scenarios/catchup-with-indexer.spec.ts +++ b/tests/scenarios/catchup-with-indexer.spec.ts @@ -1,9 +1,14 @@ +import * as algokit from '@algorandfoundation/algokit-utils' import { algorandFixture } from '@algorandfoundation/algokit-utils/testing' import { beforeEach, describe, test } from '@jest/globals' import { GetSubscribedTransactionsFromSender, SendXTransactions } from '../transactions' describe('Subscribing using catchup-with-indexer', () => { - const localnet = algorandFixture() + const localnet = algorandFixture(undefined, { + algodConfig: algokit.getDefaultLocalNetConfig('algod'), + indexerConfig: algokit.getDefaultLocalNetConfig('indexer'), + kmdConfig: algokit.getDefaultLocalNetConfig('kmd'), + }) beforeEach(localnet.beforeEach, 10e6) afterEach(() => { @@ -31,6 +36,38 @@ describe('Subscribing using catchup-with-indexer', () => { expect(subscribed.subscribedTransactions[0].id).toBe(txns[0].transaction.txID()) }) + test('Limits the number of synced transactions to maxIndexerRoundsToSync', async () => { + const { algod, indexer, testAccount, generateAccount, waitForIndexerTransaction } = localnet.context + // Ensure that if we are at round 0 there is a different transaction that won't be synced + const randomAccount = await generateAccount({ initialFunds: (3).algos() }) + const { lastTxnRound: initialWatermark } = await SendXTransactions(1, randomAccount, algod) + const { txns } = await SendXTransactions(5, testAccount, algod) + const { lastTxnRound, txIds } = await SendXTransactions(1, randomAccount, algod) + await waitForIndexerTransaction(txIds[0]) + const expectedNewWatermark = Number(txns[2].confirmation!.confirmedRound!) - 1 + const indexerRoundsToSync = expectedNewWatermark - initialWatermark + + const subscribed = await GetSubscribedTransactionsFromSender( + { + roundsToSync: 1, + indexerRoundsToSync, + syncBehaviour: 'catchup-with-indexer', + watermark: initialWatermark, + currentRound: lastTxnRound, + }, + testAccount, + algod, + indexer, + ) + + expect(subscribed.currentRound).toBe(lastTxnRound) + expect(subscribed.newWatermark).toBe(expectedNewWatermark) + expect(subscribed.syncedRoundRange).toEqual([initialWatermark + 1, expectedNewWatermark]) + expect(subscribed.subscribedTransactions.length).toBe(2) + expect(subscribed.subscribedTransactions[0].id).toBe(txns[0].transaction.txID()) + expect(subscribed.subscribedTransactions[1].id).toBe(txns[1].transaction.txID()) + }) + // Same behaviour as sync-oldest test('Processes all transactions after watermark when starting from an earlier round with other transactions', async () => { const { algod, indexer, testAccount, waitForIndexerTransaction } = localnet.context diff --git a/tests/scenarios/subscriber.spec.ts b/tests/scenarios/subscriber.spec.ts index 2f52ebb..e30854d 100644 --- a/tests/scenarios/subscriber.spec.ts +++ b/tests/scenarios/subscriber.spec.ts @@ -11,11 +11,7 @@ import { waitFor } from '../wait' import { InMemoryWatermark } from '../watermarks' describe('AlgorandSubscriber', () => { - const localnet = algorandFixture(undefined, { - algodConfig: algokit.getDefaultLocalNetConfig('algod'), - indexerConfig: algokit.getDefaultLocalNetConfig('indexer'), - kmdConfig: algokit.getDefaultLocalNetConfig('kmd'), - }) + const localnet = algorandFixture() beforeEach(localnet.beforeEach, 10e6) afterEach(() => { diff --git a/tests/transactions.ts b/tests/transactions.ts index 090a748..7879cf1 100644 --- a/tests/transactions.ts +++ b/tests/transactions.ts @@ -33,6 +33,7 @@ export const GetSubscribedTransactions = ( subscription: { syncBehaviour: TransactionSubscriptionParams['syncBehaviour'] roundsToSync: number + indexerRoundsToSync?: number watermark?: number currentRound?: number filters: TransactionFilter | NamedTransactionFilter[] @@ -41,7 +42,7 @@ export const GetSubscribedTransactions = ( algod: Algodv2, indexer?: Indexer, ) => { - const { roundsToSync, syncBehaviour, watermark, currentRound, filters, arc28Events } = subscription + const { roundsToSync, indexerRoundsToSync, syncBehaviour, watermark, currentRound, filters, arc28Events } = subscription if (currentRound !== undefined) { const existingStatus = algod.status @@ -62,6 +63,7 @@ export const GetSubscribedTransactions = ( { filters: Array.isArray(filters) ? filters : [{ name: 'default', filter: filters }], maxRoundsToSync: roundsToSync, + maxIndexerRoundsToSync: indexerRoundsToSync, syncBehaviour: syncBehaviour, watermark: watermark ?? 0, arc28Events, @@ -75,6 +77,7 @@ export const GetSubscribedTransactionsFromSender = ( subscription: { syncBehaviour: TransactionSubscriptionParams['syncBehaviour'] roundsToSync: number + indexerRoundsToSync?: number watermark?: number currentRound?: number },