Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receipt listener event streams #515

Merged
23 commits merged into from
Jan 24, 2025
Merged

Receipt listener event streams #515

23 commits merged into from
Jan 24, 2025

Conversation

peterbroadhurst
Copy link
Contributor

@peterbroadhurst peterbroadhurst commented Jan 16, 2025

Adds receipt listeners for streaming completed transactions from a Paladin node.

Architecture

A sequence column is added to the transaction_receipts table, to provide a reference point for checkpoints in the receipt listeners to determine which receipts have/haven't been processed.

Note this depends on this sequence being incremented in database transaction order (so rows cannot appear "behind" a checkpoint). This requires course grained DB lock to be introduced on the receipt listeners table. This is considered very marginal from a performance perspective, as the vast majority of receipts are written on a single thread (the blockchain indexing thread). The only threads affected with new locking, are threads writing failure receipts.

Then three new tables are introduced to manage the listeners:

  • receipt_listeners - the definitions of the listeners, each with a unique name
  • receipt_listener_gap - a blocking record when an individual smart contract is missing states
  • receipt_listener_checkpoints - the recovery point processing should restart from for a listener, to detect new receipts

Each started listener has a go routine that queries the database to detect new receipts to deliver.

Where possible, DB filters are used to pre-filter receipts that do not match the filters of the listener. As filters become more complex, some might only be filterable through post-filtering - that is already supported in the code (used in practice today to avoid waking listeners with a should-tap to poll when new receipts are written to the DB that do not match).

For each receipt the outcome might be:

  • Discard the receipt because it does not match the filters of that listener
  • Add the receipt to a batch for delivery to the next available subscriber
  • Create a gap in the DB that must be resolved before receipts can be delivered, for this particular private smart contract

Gaps are created for "incompleteStateReceiptBehavior": "block_contract" listeners, when the states for a particular smart contract have not been received yet.

This is very important for indexing cases where full visibility of a transaction data is required to process the receipt, and those transactions must be processed in order.

EVM Privacy Groups (Pente) are the primary example of this, where each receipt represents a transaction on the blockchain. Every transaction must be processed by every party maintaining the privacy group in the same order.

JSON/RPC details

Create receipt listener

Note the ability to filter on particular receipt types, and most importantly the ability to block delivery on a given contract address while a "gap" exists in the available states.

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_createReceiptListener",
    "params": [{
        "name":"listener1",
        "filters": {
            "sequenceAbove": null,
            "type": "private",
            "domain": "pente"
        },
        "options": {
            "incompleteStateReceiptBehavior": "block_contract",
            "domainReceipts": true
        }
    }]
}

Note that ptx_startReceiptListener and ptx_stopReceiptListener are also available.

Subscribe (WebSockets only)

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_subscribe",
    "params": ["receipts", "listener1"]
}

Ack

Confirms receipt, so the next batch is delivered.

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_ack",
    "params": ["5b3e0816-32e2-4aa8-80e6-6d2e41e046cb"]
}

Nack

Drives redelivery/retry for this batch

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_nack",
    "params": ["5b3e0816-32e2-4aa8-80e6-6d2e41e046cb"]
}

Unsubscribe

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_unsubscribe",
    "params": ["5b3e0816-32e2-4aa8-80e6-6d2e41e046cb"]
}

Delete receipt listener

{
    "jsonrpc": "2.0",
    "id": 1,
    "method": "ptx_deleteReceiptListener",
    "params": ["listener1"]
}

Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
…make eth_subscribe a sample

Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
Signed-off-by: Peter Broadhurst <[email protected]>
@peterbroadhurst peterbroadhurst requested a review from hosie January 18, 2025 04:40
@peterbroadhurst peterbroadhurst marked this pull request as ready for review January 18, 2025 04:40
@peterbroadhurst peterbroadhurst marked this pull request as draft January 20, 2025 12:50
…rder of numbers with commits

Signed-off-by: Peter Broadhurst <[email protected]>
Copy link
Contributor

@hosie hosie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving few comments here mid review while I time out for a couple of hours. Will hopefully get to end of review later today.

@@ -164,7 +164,7 @@ func NewUnitTestPersistence(ctx context.Context, suite string) (p Persistence, c
DSN: dbDSN(utDBName),
MigrationsDir: migrationsDirRelative,
AutoMigrate: &autoMigrate,
DebugQueries: false,
DebugQueries: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to leave this in? Not sure of the tradeoff here? Does it significantly increase test run time or log output on the CI actions?

}
}

// We use a post-commit handler to send back any acks to the other side that are required
return cleanup, make([]flushwriter.Result[*noResult], len(values)), nil
return postCommit, make([]flushwriter.Result[*noResult], len(values)), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly a point about this PR but reviewing this PR has given me an opportunity to reflect on this. I am not sure how I feel about how we return the postCommit function from each function in the stack. It is coupled to the fact that we are passing a DB transaction down the stack so another approach would have been to pass a wrapper object that contains the gorm.DB and also includes a method to register post commit handlers.

The way we have it is more explicit which is good but it does force PRs like this touch a ton of files that have no behavior change and causes a lot more "noop" code scattered around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - been thinking a lot about this, and overall GORM's plusses and minuses.

The lack of tx.AddPostCommit() is very frustrating.

But fixing it is likely a day of refactor, and I think it was right not to couple it to this PR

FailureMessage *string `gorm:"column:failure_message"`
RevertData tktypes.HexBytes `gorm:"column:revert_data"`
ContractAddress *tktypes.EthAddress `gorm:"column:contract_address"`
Block *persistedReceiptGap `gorm:"foreignKey:Source;references:Source;"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a compound foreignKey? Is there a chance that there are 2 rows in receipt_listener_gap with the same Source but different Listener ? Not sure how the gorm magic works in that case.

Not sure I understand what this field is for so I may be misreading the intent here. At first I thought it was something to do with BlockNumber but I think now that it is saying whether this receipt should be blocked from being delivered because of a known gap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I understand this now. I see we are adding the listener name to the where clause when we construct the join SQL.

So the only takeaway here is that the name Block could be confused with blockchain block but I think here it is actually a Gap or Blocker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Darn, yes. I did the rename to Gap everywhere else, but not on this Go struct. Will submit this in a FUP PR.

}
if mh.methodType == rpcMethodTypeAsyncStart {
rpcRes = wsc.handleNewAsync(ctx, rpcReq, mh.async)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be explicit here (else if mh.methodType == rpcMethodTypeAsyncLifecycle) just in case a new enum is added in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this, and just felt adding the extra errors and test cases wasn't justified. But happy to be pushed to do that as a follow-up (as this merged without that pushback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with your assessment. This is one of the things that annoys me about 100% code coverage. If you had done this, the coverage tool would whinge at you but the behavior wouldn't be any less covered than it is now.

if spec.Filters.Domain != "" {
return nil, i18n.NewError(ctx, msgs.MsgTxMgrBadReceiptListenerTypeDomain, spec.Name, spec.Filters.Type, spec.Filters.Domain)
}
q = q.Where("domain = ''") // private
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
q = q.Where("domain = ''") // private
q = q.Where("domain = ''") // public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add this to the Gap rename PR.

@hosie hosie closed this pull request by merging all changes into main in 66d8ca6 Jan 24, 2025
@hosie hosie deleted the event-streams branch January 24, 2025 11:03
peterbroadhurst added a commit that referenced this pull request Jan 24, 2025
Signed-off-by: Peter Broadhurst <[email protected]>
hosie added a commit that referenced this pull request Jan 24, 2025
awrichar pushed a commit that referenced this pull request Jan 25, 2025
Signed-off-by: Peter Broadhurst <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants