From 7de08a324e795f712e63546300b0a6809e9bbf63 Mon Sep 17 00:00:00 2001 From: "Rob Moore (MakerX)" Date: Thu, 21 Mar 2024 22:34:53 +0800 Subject: [PATCH] feat: Added onBeforePoll and onPoll methods to `AlgorandSubscriber` --- docs/code/classes/index.AlgorandSubscriber.md | 131 +++++++++++++++--- ...s_subscription.AlgorandSubscriberConfig.md | 16 +-- .../types_subscription.BeforePollMetadata.md | 38 +++++ ...ption.CoreTransactionSubscriptionParams.md | 10 +- ...pes_subscription.NamedTransactionFilter.md | 4 +- ...pes_subscription.SubscriberConfigFilter.md | 6 +- .../types_subscription.TransactionFilter.md | 30 ++-- ...scription.TransactionSubscriptionParams.md | 12 +- docs/code/modules/types.md | 7 + docs/code/modules/types_subscription.md | 3 +- docs/subscriber.md | 67 ++++++++- src/subscriber.ts | 71 +++++++++- src/types/subscription.ts | 8 ++ tests/scenarios/subscriber.spec.ts | 86 +++++++++++- 14 files changed, 420 insertions(+), 69 deletions(-) create mode 100644 docs/code/interfaces/types_subscription.BeforePollMetadata.md diff --git a/docs/code/classes/index.AlgorandSubscriber.md b/docs/code/classes/index.AlgorandSubscriber.md index 68504a8..1bd78ab 100644 --- a/docs/code/classes/index.AlgorandSubscriber.md +++ b/docs/code/classes/index.AlgorandSubscriber.md @@ -27,6 +27,8 @@ Handles the logic for subscribing to the Algorand blockchain and emitting events - [on](index.AlgorandSubscriber.md#on) - [onBatch](index.AlgorandSubscriber.md#onbatch) +- [onBeforePoll](index.AlgorandSubscriber.md#onbeforepoll) +- [onPoll](index.AlgorandSubscriber.md#onpoll) - [pollOnce](index.AlgorandSubscriber.md#pollonce) - [start](index.AlgorandSubscriber.md#start) - [stop](index.AlgorandSubscriber.md#stop) @@ -53,7 +55,7 @@ Create a new `AlgorandSubscriber`. #### Defined in -[subscriber.ts:34](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L34) +[subscriber.ts:35](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L35) ## Properties @@ -63,7 +65,7 @@ Create a new `AlgorandSubscriber`. #### Defined in -[subscriber.ts:22](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L22) +[subscriber.ts:23](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L23) ___ @@ -73,7 +75,7 @@ ___ #### Defined in -[subscriber.ts:19](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L19) +[subscriber.ts:20](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L20) ___ @@ -83,7 +85,7 @@ ___ #### Defined in -[subscriber.ts:21](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L21) +[subscriber.ts:22](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L22) ___ @@ -93,7 +95,7 @@ ___ #### Defined in -[subscriber.ts:23](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L23) +[subscriber.ts:24](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L24) ___ @@ -103,7 +105,7 @@ ___ #### Defined in -[subscriber.ts:26](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L26) +[subscriber.ts:27](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L27) ___ @@ -113,7 +115,7 @@ ___ #### Defined in -[subscriber.ts:20](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L20) +[subscriber.ts:21](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L21) ___ @@ -123,7 +125,7 @@ ___ #### Defined in -[subscriber.ts:25](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L25) +[subscriber.ts:26](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L26) ___ @@ -133,7 +135,7 @@ ___ #### Defined in -[subscriber.ts:24](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L24) +[subscriber.ts:25](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L25) ## Methods @@ -162,11 +164,24 @@ The listener can be async and it will be awaited if so. [`AlgorandSubscriber`](index.AlgorandSubscriber.md) -The subscriber so `on`/`onBatch` calls can be chained +The subscriber so `on*` calls can be chained + +**`Example`** + +```typescript +subscriber.on('my-filter', async (transaction) => { console.log(transaction.id) }) +``` + +**`Example`** + +```typescript +new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + .on('my-filter', async (transactionId) => { console.log(transactionId) }) +``` #### Defined in -[subscriber.ts:159](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L159) +[subscriber.ts:176](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L176) ___ @@ -199,11 +214,95 @@ The listener can be async and it will be awaited if so. [`AlgorandSubscriber`](index.AlgorandSubscriber.md) -The subscriber so `on`/`onBatch` calls can be chained +The subscriber so `on*` calls can be chained + +**`Example`** + +```typescript +subscriber.onBatch('my-filter', async (transactions) => { console.log(transactions.length) }) +``` + +**`Example`** + +```typescript +new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + .onBatch('my-filter', async (transactionIds) => { console.log(transactionIds) }) +``` #### Defined in -[subscriber.ts:176](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L176) +[subscriber.ts:202](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L202) + +___ + +### onBeforePoll + +▸ **onBeforePoll**(`listener`): [`AlgorandSubscriber`](index.AlgorandSubscriber.md) + +Register an event handler to run before every subscription poll. + +This is useful when you want to do pre-poll logging or start a transaction etc. + +The listener can be async and it will be awaited if so. + +#### Parameters + +| Name | Type | Description | +| :------ | :------ | :------ | +| `listener` | [`TypedAsyncEventListener`](../modules/types_subscription.md#typedasynceventlistener)\<[`BeforePollMetadata`](../interfaces/types_subscription.BeforePollMetadata.md)\> | The listener function to invoke with the pre-poll metadata | + +#### Returns + +[`AlgorandSubscriber`](index.AlgorandSubscriber.md) + +The subscriber so `on*` calls can be chained + +**`Example`** + +```typescript +subscriber.onBeforePoll(async (metadata) => { console.log(metadata.watermark) }) +``` + +#### Defined in + +[subscriber.ts:220](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L220) + +___ + +### onPoll + +▸ **onPoll**(`listener`): [`AlgorandSubscriber`](index.AlgorandSubscriber.md) + +Register an event handler to run after every subscription poll. + +This is useful when you want to process all subscribed transactions +in a transactionally consistent manner rather than piecemeal for each +filter, or to have a hook that occurs at the end of each poll to commit +transactions etc. + +The listener can be async and it will be awaited if so. + +#### Parameters + +| Name | Type | Description | +| :------ | :------ | :------ | +| `listener` | [`TypedAsyncEventListener`](../modules/types_subscription.md#typedasynceventlistener)\<[`TransactionSubscriptionResult`](../interfaces/types_subscription.TransactionSubscriptionResult.md)\> | The listener function to invoke with the poll result | + +#### Returns + +[`AlgorandSubscriber`](index.AlgorandSubscriber.md) + +The subscriber so `on*` calls can be chained + +**`Example`** + +```typescript +subscriber.onPoll(async (pollResult) => { console.log(pollResult.subscribedTransactions.length, pollResult.syncedRoundRange) }) +``` + +#### Defined in + +[subscriber.ts:241](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L241) ___ @@ -224,7 +323,7 @@ The poll result #### Defined in -[subscriber.ts:60](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L60) +[subscriber.ts:61](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L61) ___ @@ -251,7 +350,7 @@ An object that contains a promise you can wait for after calling stop #### Defined in -[subscriber.ts:98](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L98) +[subscriber.ts:105](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L105) ___ @@ -275,4 +374,4 @@ A promise that can be awaited to ensure the subscriber has finished stopping #### Defined in -[subscriber.ts:141](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L141) +[subscriber.ts:149](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/subscriber.ts#L149) diff --git a/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md b/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md index e75d414..5062827 100644 --- a/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md +++ b/docs/code/interfaces/types_subscription.AlgorandSubscriberConfig.md @@ -39,7 +39,7 @@ Any ARC-28 event definitions to process from app call logs #### Defined in -[types/subscription.ts:69](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L69) +[types/subscription.ts:77](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L77) ___ @@ -55,7 +55,7 @@ The set of filters to subscribe to / emit events for, along with optional data m #### Defined in -[types/subscription.ts:175](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L175) +[types/subscription.ts:183](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L183) ___ @@ -67,7 +67,7 @@ The frequency to poll for new blocks in seconds; defaults to 1s #### Defined in -[types/subscription.ts:177](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L177) +[types/subscription.ts:185](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L185) ___ @@ -90,7 +90,7 @@ boundary based on the number of rounds specified here. #### Defined in -[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) +[types/subscription.ts:97](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L97) ___ @@ -112,7 +112,7 @@ your catchup speed when using `sync-oldest`. #### Defined in -[types/subscription.ts:78](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L78) +[types/subscription.ts:86](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L86) ___ @@ -143,7 +143,7 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) +[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) ___ @@ -155,7 +155,7 @@ Whether to wait via algod `/status/wait-for-block-after` endpoint when at the ti #### Defined in -[types/subscription.ts:179](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L179) +[types/subscription.ts:187](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L187) ___ @@ -175,4 +175,4 @@ its position in the chain #### Defined in -[types/subscription.ts:182](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L182) +[types/subscription.ts:190](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L190) diff --git a/docs/code/interfaces/types_subscription.BeforePollMetadata.md b/docs/code/interfaces/types_subscription.BeforePollMetadata.md new file mode 100644 index 0000000..0b2a13a --- /dev/null +++ b/docs/code/interfaces/types_subscription.BeforePollMetadata.md @@ -0,0 +1,38 @@ +[@algorandfoundation/algokit-subscriber](../README.md) / [types/subscription](../modules/types_subscription.md) / BeforePollMetadata + +# Interface: BeforePollMetadata + +[types/subscription](../modules/types_subscription.md).BeforePollMetadata + +Metadata about an impending subscription poll. + +## Table of contents + +### Properties + +- [currentRound](types_subscription.BeforePollMetadata.md#currentround) +- [watermark](types_subscription.BeforePollMetadata.md#watermark) + +## Properties + +### currentRound + +• **currentRound**: `number` + +The current round of algod + +#### Defined in + +[types/subscription.ts:49](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L49) + +___ + +### watermark + +• **watermark**: `number` + +The current watermark of the subscriber + +#### Defined in + +[types/subscription.ts:47](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L47) diff --git a/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md b/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md index 6a9a46a..917983b 100644 --- a/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md +++ b/docs/code/interfaces/types_subscription.CoreTransactionSubscriptionParams.md @@ -34,7 +34,7 @@ Any ARC-28 event definitions to process from app call logs #### Defined in -[types/subscription.ts:69](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L69) +[types/subscription.ts:77](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L77) ___ @@ -65,7 +65,7 @@ A list of filters with corresponding names. #### Defined in -[types/subscription.ts:67](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L67) +[types/subscription.ts:75](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L75) ___ @@ -84,7 +84,7 @@ boundary based on the number of rounds specified here. #### Defined in -[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) +[types/subscription.ts:97](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L97) ___ @@ -102,7 +102,7 @@ your catchup speed when using `sync-oldest`. #### Defined in -[types/subscription.ts:78](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L78) +[types/subscription.ts:86](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L86) ___ @@ -129,4 +129,4 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) +[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) diff --git a/docs/code/interfaces/types_subscription.NamedTransactionFilter.md b/docs/code/interfaces/types_subscription.NamedTransactionFilter.md index 07ab5de..0c4d362 100644 --- a/docs/code/interfaces/types_subscription.NamedTransactionFilter.md +++ b/docs/code/interfaces/types_subscription.NamedTransactionFilter.md @@ -29,7 +29,7 @@ The filter itself. #### Defined in -[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) +[types/subscription.ts:123](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L123) ___ @@ -41,4 +41,4 @@ The name to give the filter. #### Defined in -[types/subscription.ts:113](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L113) +[types/subscription.ts:121](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L121) diff --git a/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md b/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md index b28c2d6..28efda8 100644 --- a/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md +++ b/docs/code/interfaces/types_subscription.SubscriberConfigFilter.md @@ -40,7 +40,7 @@ The filter itself. #### Defined in -[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) +[types/subscription.ts:123](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L123) ___ @@ -70,7 +70,7 @@ Note: if you provide multiple filters with the same name then only the mapper of #### Defined in -[types/subscription.ts:198](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L198) +[types/subscription.ts:206](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L206) ___ @@ -86,4 +86,4 @@ The name to give the filter. #### Defined in -[types/subscription.ts:113](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L113) +[types/subscription.ts:121](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L121) diff --git a/docs/code/interfaces/types_subscription.TransactionFilter.md b/docs/code/interfaces/types_subscription.TransactionFilter.md index 8234794..4ed7fff 100644 --- a/docs/code/interfaces/types_subscription.TransactionFilter.md +++ b/docs/code/interfaces/types_subscription.TransactionFilter.md @@ -50,7 +50,7 @@ Filter to app transactions that meet the given app arguments predicate. #### Defined in -[types/subscription.ts:150](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L150) +[types/subscription.ts:158](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L158) ___ @@ -62,7 +62,7 @@ Filter to transactions that are creating an app. #### Defined in -[types/subscription.ts:131](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L131) +[types/subscription.ts:139](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L139) ___ @@ -74,7 +74,7 @@ Filter to transactions against the app with the given ID. #### Defined in -[types/subscription.ts:129](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L129) +[types/subscription.ts:137](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L137) ___ @@ -86,7 +86,7 @@ Filter to transactions that have given on complete(s). #### Defined in -[types/subscription.ts:133](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L133) +[types/subscription.ts:141](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L141) ___ @@ -99,7 +99,7 @@ Note: the definitions for these events must be passed in to the subscription con #### Defined in -[types/subscription.ts:154](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L154) +[types/subscription.ts:162](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L162) ___ @@ -111,7 +111,7 @@ Filter to transactions that are creating an asset. #### Defined in -[types/subscription.ts:137](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L137) +[types/subscription.ts:145](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L145) ___ @@ -123,7 +123,7 @@ Filter to transactions against the asset with the given ID. #### Defined in -[types/subscription.ts:135](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L135) +[types/subscription.ts:143](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L143) ___ @@ -136,7 +136,7 @@ or equal to the given maximum (microAlgos or decimal units of an ASA if type: ax #### Defined in -[types/subscription.ts:143](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L143) +[types/subscription.ts:151](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L151) ___ @@ -149,7 +149,7 @@ the given method signature as the first app argument. #### Defined in -[types/subscription.ts:146](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L146) +[types/subscription.ts:154](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L154) ___ @@ -161,7 +161,7 @@ Filter to app transactions that match one of the given ARC-0004 method selectors #### Defined in -[types/subscription.ts:148](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L148) +[types/subscription.ts:156](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L156) ___ @@ -174,7 +174,7 @@ than or equal to the given minimum (microAlgos or decimal units of an ASA if typ #### Defined in -[types/subscription.ts:140](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L140) +[types/subscription.ts:148](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L148) ___ @@ -186,7 +186,7 @@ Filter to transactions with a note having the given prefix. #### Defined in -[types/subscription.ts:127](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L127) +[types/subscription.ts:135](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L135) ___ @@ -198,7 +198,7 @@ Filter to transactions being received by the specified address. #### Defined in -[types/subscription.ts:125](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L125) +[types/subscription.ts:133](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L133) ___ @@ -210,7 +210,7 @@ Filter to transactions sent from the specified address. #### Defined in -[types/subscription.ts:123](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L123) +[types/subscription.ts:131](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L131) ___ @@ -222,4 +222,4 @@ Filter based on the given transaction type. #### Defined in -[types/subscription.ts:121](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L121) +[types/subscription.ts:129](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L129) diff --git a/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md b/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md index c932c84..aa8722b 100644 --- a/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md +++ b/docs/code/interfaces/types_subscription.TransactionSubscriptionParams.md @@ -37,7 +37,7 @@ Any ARC-28 event definitions to process from app call logs #### Defined in -[types/subscription.ts:69](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L69) +[types/subscription.ts:77](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L77) ___ @@ -72,7 +72,7 @@ A list of filters with corresponding names. #### Defined in -[types/subscription.ts:67](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L67) +[types/subscription.ts:75](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L75) ___ @@ -95,7 +95,7 @@ boundary based on the number of rounds specified here. #### Defined in -[types/subscription.ts:89](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L89) +[types/subscription.ts:97](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L97) ___ @@ -117,7 +117,7 @@ your catchup speed when using `sync-oldest`. #### Defined in -[types/subscription.ts:78](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L78) +[types/subscription.ts:86](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L86) ___ @@ -148,7 +148,7 @@ past `watermark` then how should that be handled: #### Defined in -[types/subscription.ts:107](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L107) +[types/subscription.ts:115](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L115) ___ @@ -168,4 +168,4 @@ will be slow if `onMaxRounds` is `sync-oldest`. #### Defined in -[types/subscription.ts:169](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L169) +[types/subscription.ts:177](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L177) diff --git a/docs/code/modules/types.md b/docs/code/modules/types.md index a5f6e1b..8cf2ada 100644 --- a/docs/code/modules/types.md +++ b/docs/code/modules/types.md @@ -12,6 +12,7 @@ - [Arc28EventToProcess](types.md#arc28eventtoprocess) - [AsyncEventEmitter](types.md#asynceventemitter) - [AsyncEventListener](types.md#asynceventlistener) +- [BeforePollMetadata](types.md#beforepollmetadata) - [Block](types.md#block) - [BlockInnerTransaction](types.md#blockinnertransaction) - [BlockTransaction](types.md#blocktransaction) @@ -69,6 +70,12 @@ Re-exports [AsyncEventListener](types_async_event_emitter.md#asynceventlistener) ___ +### BeforePollMetadata + +Re-exports [BeforePollMetadata](../interfaces/types_subscription.BeforePollMetadata.md) + +___ + ### Block Re-exports [Block](../interfaces/types_block.Block.md) diff --git a/docs/code/modules/types_subscription.md b/docs/code/modules/types_subscription.md index 56e40bc..85f8c31 100644 --- a/docs/code/modules/types_subscription.md +++ b/docs/code/modules/types_subscription.md @@ -7,6 +7,7 @@ ### Interfaces - [AlgorandSubscriberConfig](../interfaces/types_subscription.AlgorandSubscriberConfig.md) +- [BeforePollMetadata](../interfaces/types_subscription.BeforePollMetadata.md) - [CoreTransactionSubscriptionParams](../interfaces/types_subscription.CoreTransactionSubscriptionParams.md) - [NamedTransactionFilter](../interfaces/types_subscription.NamedTransactionFilter.md) - [SubscriberConfigFilter](../interfaces/types_subscription.SubscriberConfigFilter.md) @@ -65,4 +66,4 @@ ___ #### Defined in -[types/subscription.ts:201](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L201) +[types/subscription.ts:209](https://github.com/algorandfoundation/algokit-subscriber-ts/blob/main/src/types/subscription.ts#L209) diff --git a/docs/subscriber.md b/docs/subscriber.md index 27a2f85..8caaa9b 100644 --- a/docs/subscriber.md +++ b/docs/subscriber.md @@ -100,18 +100,27 @@ The event name is a unique name that describes the event you are subscribing to. ## Subscribing to events -Once you have created the `AlgorandSubscriber`, you can register handlers/listeners for the filters you have defined. +Once you have created the `AlgorandSubscriber`, you can register handlers/listeners for the filters you have defined, or each poll as a whole batch. -You can do this via the `on` and `onBatch` methods: +You can do this via the `on`, `onBatch` and `onPoll` methods: -```typescript +````typescript /** * Register an event handler to run on every subscribed transaction matching the given filter name. * * The listener can be async and it will be awaited if so. + * @example **Non-mapped** + * ```typescript + * subscriber.on('my-filter', async (transaction) => { console.log(transaction.id) }) + * ``` + * @example **Mapped** + * ```typescript + * new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + * .on('my-filter', async (transactionId) => { console.log(transactionId) }) + * ``` * @param filterName The name of the filter to subscribe to * @param listener The listener function to invoke with the subscribed event - * @returns The subscriber so `on`/`onBatch` calls can be chained + * @returns The subscriber so `on*` calls can be chained */ on(filterName: string, listener: TypedAsyncEventListener) {} @@ -123,12 +132,54 @@ You can do this via the `on` and `onBatch` methods: * in bulk rather than one-by-one. * * The listener can be async and it will be awaited if so. + * @example **Non-mapped** + * ```typescript + * subscriber.onBatch('my-filter', async (transactions) => { console.log(transactions.length) }) + * ``` + * @example **Mapped** + * ```typescript + * new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + * .onBatch('my-filter', async (transactionIds) => { console.log(transactionIds) }) + * ``` * @param filterName The name of the filter to subscribe to * @param listener The listener function to invoke with the subscribed events - * @returns The subscriber so `on`/`onBatch` calls can be chained + * @returns The subscriber so `on*` calls can be chained */ onBatch(filterName: string, listener: TypedAsyncEventListener) {} -``` + + /** + * Register an event handler to run before every subscription poll. + * + * This is useful when you want to do pre-poll logging or start a transaction etc. + * + * The listener can be async and it will be awaited if so. + * @example + * ```typescript + * subscriber.onBeforePoll(async (metadata) => { console.log(metadata.watermark) }) + * ``` + * @param listener The listener function to invoke with the pre-poll metadata + * @returns The subscriber so `on*` calls can be chained + */ + onBeforePoll(listener: TypedAsyncEventListener) {} + + /** + * Register an event handler to run after every subscription poll. + * + * This is useful when you want to process all subscribed transactions + * in a transactionally consistent manner rather than piecemeal for each + * filter, or to have a hook that occurs at the end of each poll to commit + * transactions etc. + * + * The listener can be async and it will be awaited if so. + * @example + * ```typescript + * subscriber.onPoll(async (pollResult) => { console.log(pollResult.subscribedTransactions.length, pollResult.syncedRoundRange) }) + * ``` + * @param listener The listener function to invoke with the poll result + * @returns The subscriber so `on*` calls can be chained + */ + onPoll(listener: TypedAsyncEventListener) {} +```` The `TypedAsyncEventListener` type is defined as: @@ -152,6 +203,10 @@ See the [detail about this type](subscriptions.md#subscribedtransaction). Alternatively, if you defined a mapper against the filter then it will be applied before passing the objects through. +If you call `onPoll` it will be called last (after all `on` and `onBatch` listeners) for each poll, with the full set of transactions for that poll and [metadata about the poll result](./subscriptions.md#transactionsubscriptionresult). This allows you to process the entire poll batch in one transaction or have a hook to call after processing individual listeners (e.g. to commit a transaction). + +If you want to run code before a poll starts (e.g. to log or start a transaction) you can do so with `onBeforePoll`. + ## Poll the chain There are two methods to poll the chain for events: `pollOnce` and `start`: diff --git a/src/subscriber.ts b/src/subscriber.ts index 9f4fb87..07e5800 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -4,6 +4,7 @@ import { getSubscribedTransactions } from './subscriptions' import { AsyncEventEmitter, AsyncEventListener } from './types/async-event-emitter' import type { AlgorandSubscriberConfig, + BeforePollMetadata, SubscribedTransaction, TransactionSubscriptionResult, TypedAsyncEventListener, @@ -60,6 +61,12 @@ export class AlgorandSubscriber { async pollOnce(): Promise { const watermark = await this.config.watermarkPersistence.get() + const currentRound = (await this.algod.status().do())['last-round'] as number + await this.eventEmitter.emitAsync('before:poll', { + watermark, + currentRound, + } satisfies BeforePollMetadata) + const pollResult = await getSubscribedTransactions( { watermark, @@ -103,7 +110,6 @@ export class AlgorandSubscriber { // eslint-disable-next-line no-console const start = +new Date() const result = await this.pollOnce() - inspect?.(result) const durationInSeconds = (+new Date() - start) / 1000 algokit.Config.getLogger(suppressLog).debug('Subscription poll', { currentRound: result.currentRound, @@ -111,6 +117,8 @@ export class AlgorandSubscriber { syncedRoundRange: result.syncedRoundRange, subscribedTransactionsLength: result.subscribedTransactions.length, }) + inspect?.(result) + await this.eventEmitter.emitAsync('poll', result) // eslint-disable-next-line no-console if (result.currentRound > result.newWatermark || !this.config.waitForBlockWhenAtTip) { algokit.Config.getLogger(suppressLog).info( @@ -152,9 +160,18 @@ export class AlgorandSubscriber { * Register an event handler to run on every subscribed transaction matching the given filter name. * * The listener can be async and it will be awaited if so. + * @example **Non-mapped** + * ```typescript + * subscriber.on('my-filter', async (transaction) => { console.log(transaction.id) }) + * ``` + * @example **Mapped** + * ```typescript + * new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + * .on('my-filter', async (transactionId) => { console.log(transactionId) }) + * ``` * @param filterName The name of the filter to subscribe to * @param listener The listener function to invoke with the subscribed event - * @returns The subscriber so `on`/`onBatch` calls can be chained + * @returns The subscriber so `on*` calls can be chained */ on(filterName: string, listener: TypedAsyncEventListener) { this.eventEmitter.on(filterName, listener as AsyncEventListener) @@ -169,12 +186,60 @@ export class AlgorandSubscriber { * in bulk rather than one-by-one. * * The listener can be async and it will be awaited if so. + * @example **Non-mapped** + * ```typescript + * subscriber.onBatch('my-filter', async (transactions) => { console.log(transactions.length) }) + * ``` + * @example **Mapped** + * ```typescript + * new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod) + * .onBatch('my-filter', async (transactionIds) => { console.log(transactionIds) }) + * ``` * @param filterName The name of the filter to subscribe to * @param listener The listener function to invoke with the subscribed events - * @returns The subscriber so `on`/`onBatch` calls can be chained + * @returns The subscriber so `on*` calls can be chained */ onBatch(filterName: string, listener: TypedAsyncEventListener) { this.eventEmitter.on(`batch:${filterName}`, listener as AsyncEventListener) return this } + + /** + * Register an event handler to run before every subscription poll. + * + * This is useful when you want to do pre-poll logging or start a transaction etc. + * + * The listener can be async and it will be awaited if so. + * @example + * ```typescript + * subscriber.onBeforePoll(async (metadata) => { console.log(metadata.watermark) }) + * ``` + * @param listener The listener function to invoke with the pre-poll metadata + * @returns The subscriber so `on*` calls can be chained + */ + onBeforePoll(listener: TypedAsyncEventListener) { + this.eventEmitter.on('before:poll', listener as AsyncEventListener) + return this + } + + /** + * Register an event handler to run after every subscription poll. + * + * This is useful when you want to process all subscribed transactions + * in a transactionally consistent manner rather than piecemeal for each + * filter, or to have a hook that occurs at the end of each poll to commit + * transactions etc. + * + * The listener can be async and it will be awaited if so. + * @example + * ```typescript + * subscriber.onPoll(async (pollResult) => { console.log(pollResult.subscribedTransactions.length, pollResult.syncedRoundRange) }) + * ``` + * @param listener The listener function to invoke with the poll result + * @returns The subscriber so `on*` calls can be chained + */ + onPoll(listener: TypedAsyncEventListener) { + this.eventEmitter.on('poll', listener as AsyncEventListener) + return this + } } diff --git a/src/types/subscription.ts b/src/types/subscription.ts index 55d8433..6a3bdf0 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -41,6 +41,14 @@ export type SubscribedTransaction = TransactionResult & { filtersMatched?: string[] } +/** Metadata about an impending subscription poll. */ +export interface BeforePollMetadata { + /** The current watermark of the subscriber */ + watermark: number + /** The current round of algod */ + currentRound: number +} + /** Common parameters to control a single subscription pull/poll for both `AlgorandSubscriber` and `getSubscribedTransactions`. */ export interface CoreTransactionSubscriptionParams { /** The filter(s) to apply to find transactions of interest. diff --git a/tests/scenarios/subscriber.spec.ts b/tests/scenarios/subscriber.spec.ts index e30854d..fffd322 100644 --- a/tests/scenarios/subscriber.spec.ts +++ b/tests/scenarios/subscriber.spec.ts @@ -96,7 +96,7 @@ describe('AlgorandSubscriber', () => { expect(subscribedTxns.length).toBe(2) expect(subscribedTxns[1]).toBe(txIds3[0]) expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound3) - }) + }, 30_000) test('Subscribes to transactions with multiple filters correctly', async () => { const { algod, testAccount, generateAccount, waitForIndexerTransaction, indexer } = localnet.context @@ -179,7 +179,7 @@ describe('AlgorandSubscriber', () => { expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound2) // More subscribed transactions - const { lastTxnRound: firstTxnRound3, txIds: txIds3 } = await SendXTransactions(1, testAccount, algod) + const { txIds: txIds3 } = await SendXTransactions(1, testAccount, algod) const { txIds: txIds13 } = await SendXTransactions(2, senders[0], algod) const { lastTxnRound: lastSubscribedRound3, txIds: txIds23, txns: txns23 } = await SendXTransactions(2, senders[1], algod) @@ -195,7 +195,7 @@ describe('AlgorandSubscriber', () => { expect(getWatermark()).toBeGreaterThanOrEqual(lastSubscribedRound3) expect(result3.currentRound).toBeGreaterThanOrEqual(lastSubscribedRound3) expect(result3.newWatermark).toBe(result3.currentRound) - expect(result3.syncedRoundRange).toEqual([firstTxnRound3, result3.currentRound]) + expect(result3.syncedRoundRange).toEqual([result2.newWatermark + 1, result3.currentRound]) expect(result3.subscribedTransactions.length).toBe(5) expect(result3.subscribedTransactions.map((t) => t.id)).toEqual(txIds3.concat(txIds13, txIds23)) expect(sender1TxnIds).toEqual(txIds1.concat(txIds13)) @@ -204,7 +204,7 @@ describe('AlgorandSubscriber', () => { txns2.map((t) => Number(t.confirmation!.confirmedRound!)).concat(txns23.map((t) => Number(t.confirmation!.confirmedRound!))), ) expect(sender2RoundsfromBatch).toEqual(txns23.map((t) => Number(t.confirmation!.confirmedRound!))) - }) + }, 30_000) test('Subscribes to transactions at regular intervals when started and can be stopped', async () => { const { algod, testAccount, waitForIndexerTransaction, indexer } = localnet.context @@ -281,4 +281,82 @@ describe('AlgorandSubscriber', () => { // Expect at least 1 poll to have occurred expect(pollCountAfterIssuing - pollCountBeforeIssuing).toBeGreaterThanOrEqual(1) }) + + test('Correctly fires various on* methods', async () => { + const { algod, testAccount, generateAccount, indexer } = localnet.context + const randomAccount = await generateAccount({ initialFunds: (3).algos() }) + const { txns, txIds } = await SendXTransactions(2, testAccount, algod) + const { txIds: txIds2 } = await SendXTransactions(2, randomAccount, algod) + const initialWatermark = Number(txns[0].confirmation!.confirmedRound!) - 1 + const eventsEmitted: string[] = [] + let pollComplete = false + const { subscriber } = getSubscriber( + { + testAccount: algokit.randomAccount(), + initialWatermark, + configOverrides: { + maxRoundsToSync: 100, + syncBehaviour: 'sync-oldest', + frequencyInSeconds: 1000, + filters: [ + { + name: 'account1', + filter: { + sender: algokit.getSenderAddress(testAccount), + }, + }, + { + name: 'account2', + filter: { + sender: algokit.getSenderAddress(randomAccount), + }, + }, + ], + }, + }, + algod, + indexer, + ) + subscriber + .onBatch('account1', (b) => { + eventsEmitted.push(`batch:account1:${b.map((b) => b.id).join(':')}`) + }) + .on('account1', (t) => { + eventsEmitted.push(`account1:${t.id}`) + }) + .onBatch('account2', (b) => { + eventsEmitted.push(`batch:account2:${b.map((b) => b.id).join(':')}`) + }) + .on('account2', (t) => { + eventsEmitted.push(`account2:${t.id}`) + }) + .onBeforePoll((metadata) => { + eventsEmitted.push(`before:poll:${metadata.watermark}`) + }) + .onPoll((result) => { + eventsEmitted.push(`poll:${result.subscribedTransactions.map((b) => b.id).join(':')}`) + }) + + subscriber.start((result) => { + eventsEmitted.push(`inspect:${result.subscribedTransactions.map((b) => b.id).join(':')}`) + pollComplete = true + }) + + console.log('Waiting for up to 2s until subscriber has polled') + await waitFor(() => pollComplete, 2000) + + const expectedBatchResult = `${txIds[0]}:${txIds[1]}:${txIds2[0]}:${txIds2[1]}` + expect(eventsEmitted).toEqual([ + `before:poll:${initialWatermark}`, + `batch:account1:${txIds[0]}:${txIds[1]}`, + `account1:${txIds[0]}`, + `account1:${txIds[1]}`, + `batch:account2:${txIds2[0]}:${txIds2[1]}`, + `account2:${txIds2[0]}`, + `account2:${txIds2[1]}`, + `inspect:${expectedBatchResult}`, + `poll:${expectedBatchResult}`, + ]) + await subscriber.stop('TEST') + }) })