Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receipt listener event streams #515

Merged
23 commits merged into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b721e97
Work out model for blocking listeners and DB
peterbroadhurst Jan 8, 2025
9563dab
Work through DB migrations and get ready for delivery of msgs
peterbroadhurst Jan 9, 2025
7556e3c
Add receiver management APIs
peterbroadhurst Jan 9, 2025
9aa8b9c
Merge branch 'main' of https://github.com/kaleido-io/paladin into eve…
peterbroadhurst Jan 13, 2025
af05943
CRUD actions for receipt listeners
peterbroadhurst Jan 13, 2025
50d326a
CRUD lifecycle test for transaction receipt listeners
peterbroadhurst Jan 14, 2025
8a41f98
Expose lifecycle APIs for receipt listeners over JSON/RPC
peterbroadhurst Jan 14, 2025
b191519
Work on testing e2e delivery
peterbroadhurst Jan 14, 2025
f74ae8b
First e2e delivery test for receipt listeners
peterbroadhurst Jan 14, 2025
23a024e
Work through logic up to the point of blocks
peterbroadhurst Jan 15, 2025
bf073c0
Rename "blocks" to "gaps" to avoid massive confusion
peterbroadhurst Jan 15, 2025
3f7728b
Rename block to gap, and add gap resolution code
peterbroadhurst Jan 16, 2025
d2c3244
Re-work gap to avoid race condition, using the writing of the state i…
peterbroadhurst Jan 16, 2025
4de5a35
Work through logic ahead of adding regular polling for catchup
peterbroadhurst Jan 16, 2025
b4789cc
Add post-commit notification for new states, combining with re-poll t…
peterbroadhurst Jan 16, 2025
d1079e3
Refactor async support in RPCServer to support arbitrary protocols - …
peterbroadhurst Jan 16, 2025
5662dd5
Build fixes for event streams
peterbroadhurst Jan 17, 2025
f43860c
JSON/RPC subscription interface for receipt listeners
peterbroadhurst Jan 17, 2025
e700e74
Work through JSON/RPC subscription to events
peterbroadhurst Jan 18, 2025
27de8ab
Add nack redelivery test
peterbroadhurst Jan 18, 2025
1b8b8a9
Update generateDocs for new sequence field
peterbroadhurst Jan 18, 2025
2285961
Merge branch 'main' of https://github.com/kaleido-io/paladin into eve…
peterbroadhurst Jan 18, 2025
a86e46a
Add advisory lock prior to incrementing receipt sequence, to ensure o…
peterbroadhurst Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions config/pkg/pldconf/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
)

type TxManagerConfig struct {
ABI ABIConfig `json:"abi"`
Transactions TransactionsConfig `json:"transactions"`
ABI ABIConfig `json:"abi"`
Transactions TransactionsConfig `json:"transactions"`
ReceiptListeners ReceiptListeners `json:"receiptListeners"`
}

type ABIConfig struct {
Expand All @@ -32,6 +33,12 @@ type TransactionsConfig struct {
Cache CacheConfig `json:"cache"`
}

type ReceiptListeners struct {
Retry RetryConfig `json:"retry"`
ReadPageSize *int `json:"readPageSize"`
StateGapCheckInterval *string `json:"stateGapCheckInterval"`
}

var TxManagerDefaults = &TxManagerConfig{
ABI: ABIConfig{
Cache: CacheConfig{
Expand All @@ -43,4 +50,9 @@ var TxManagerDefaults = &TxManagerConfig{
Capacity: confutil.P(100),
},
},
ReceiptListeners: ReceiptListeners{
Retry: GenericRetryDefaults.RetryConfig,
ReadPageSize: confutil.P(100),
StateGapCheckInterval: confutil.P("1s"),
},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;
DROP TABLE receipt_listener_gap;
DROP TABLE receipt_listener_checkpoints;
DROP TABLE receipt_listeners;
COMMIT;

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
BEGIN;

DROP INDEX transaction_receipts_tx_hash;
DROP INDEX transaction_receipts_source;

ALTER TABLE transaction_receipts RENAME TO transaction_receipts_old;

-- We need to add the sequence column, but the easiest thing is to simply re-create the table
CREATE TABLE transaction_receipts (
"sequence" BIGINT GENERATED ALWAYS AS IDENTITY,
"transaction" UUID NOT NULL, -- note there is no foreign key to transactions here - we can have receipts for TXs that we do not know locally
"domain" VARCHAR NOT NULL, -- empty string for public
"indexed" BIGINT NOT NULL,
"success" BOOLEAN NOT NULL,
"failure_message" VARCHAR,
"revert_data" VARCHAR,
"tx_hash" VARCHAR,
"tx_index" INT,
"log_index" INT,
"source" VARCHAR,
"block_number" BIGINT,
"contract_address" VARCHAR
);
CREATE UNIQUE INDEX transaction_receipts_tx_id ON transaction_receipts ("transaction");
CREATE INDEX transaction_receipts_tx_hash ON transaction_receipts ("tx_hash");
CREATE INDEX transaction_receipts_source ON transaction_receipts ("source");

-- Copy any existing data over
INSERT INTO transaction_receipts (
"transaction",
"domain",
"indexed",
"success",
"failure_message",
"revert_data",
"tx_hash",
"tx_index",
"log_index",
"source",
"block_number",
"contract_address"
) SELECT
"transaction",
"domain",
"indexed",
"success",
"failure_message",
"revert_data",
"tx_hash",
"tx_index",
"log_index",
"source",
"block_number",
"contract_address"
FROM transaction_receipts_old;

CREATE TABLE receipt_listeners (
"name" TEXT NOT NULL,
"created" BIGINT NOT NULL,
"started" BOOLEAN NOT NULL,
"filters" TEXT NOT NULL,
"options" TEXT NOT NULL,
PRIMARY KEY("name")
);

CREATE TABLE receipt_listener_gap (
"listener" TEXT NOT NULL,
"source" TEXT NOT NULL,
"transaction" UUID NOT NULL,
"sequence" BIGINT NOT NULL,
"domain_name" TEXT NOT NULL,
"state" TEXT , -- null when a pagination checkpoint
PRIMARY KEY ("listener", "source"),
FOREIGN KEY ("listener") REFERENCES receipt_listeners ("name") ON DELETE CASCADE
);

CREATE INDEX receipt_listener_gap_txid ON receipt_listener_gap("transaction");

CREATE TABLE receipt_listener_checkpoints (
"listener" TEXT NOT NULL,
"sequence" BIGINT NOT NULL,
"time" BIGINT NOT NULL,
PRIMARY KEY ("listener"),
FOREIGN KEY ("listener") REFERENCES receipt_listeners ("name") ON DELETE CASCADE
);


COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE receipt_listener_gap;
DROP TABLE receipt_listener_checkpoints;
DROP TABLE receipt_listeners;
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
DROP INDEX transaction_receipts_tx_hash;
DROP INDEX transaction_receipts_source;

ALTER TABLE transaction_receipts RENAME TO transaction_receipts_old;

-- We need to add the sequence column, but the easiest thing is to simply re-create the table
CREATE TABLE transaction_receipts (
"sequence" INTEGER PRIMARY KEY AUTOINCREMENT,
"transaction" UUID NOT NULL, -- note there is no foreign key to transactions here - we can have receipts for TXs that we do not know locally
"domain" VARCHAR NOT NULL, -- empty string for public
"indexed" BIGINT NOT NULL,
"success" BOOLEAN NOT NULL,
"failure_message" VARCHAR,
"revert_data" VARCHAR,
"tx_hash" VARCHAR,
"tx_index" INT,
"log_index" INT,
"source" VARCHAR,
"block_number" BIGINT,
"contract_address" VARCHAR
);
CREATE UNIQUE INDEX transaction_receipts_tx_id ON transaction_receipts ("transaction");
CREATE INDEX transaction_receipts_tx_hash ON transaction_receipts ("tx_hash");
CREATE INDEX transaction_receipts_source ON transaction_receipts ("source");

-- Copy any existing data over
INSERT INTO transaction_receipts (
"transaction",
"domain",
"indexed",
"success",
"failure_message",
"revert_data",
"tx_hash",
"tx_index",
"log_index",
"source",
"block_number",
"contract_address"
) SELECT
"transaction",
"domain",
"indexed",
"success",
"failure_message",
"revert_data",
"tx_hash",
"tx_index",
"log_index",
"source",
"block_number",
"contract_address"
FROM transaction_receipts_old;

CREATE TABLE receipt_listeners (
"name" TEXT NOT NULL,
"created" BIGINT NOT NULL,
"started" BOOLEAN NOT NULL,
"filters" TEXT NOT NULL,
"options" TEXT NOT NULL,
PRIMARY KEY("name")
);

CREATE TABLE receipt_listener_gap (
"listener" TEXT NOT NULL,
"source" TEXT NOT NULL,
"transaction" UUID NOT NULL,
"sequence" BIGINT NOT NULL,
"domain_name" TEXT NOT NULL,
"state" TEXT , -- null when a pagination checkpoint
PRIMARY KEY ("listener", "source"),
FOREIGN KEY ("listener") REFERENCES receipt_listeners ("name") ON DELETE CASCADE
);

CREATE INDEX receipt_listener_gap_txid ON receipt_listener_gap("transaction");

CREATE TABLE receipt_listener_checkpoints (
"listener" TEXT NOT NULL,
"sequence" BIGINT NOT NULL,
"time" BIGINT NOT NULL,
PRIMARY KEY ("listener"),
FOREIGN KEY ("listener") REFERENCES receipt_listeners ("name") ON DELETE CASCADE
);
2 changes: 1 addition & 1 deletion core/go/internal/components/privatetxmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type PrivateTxManager interface {
// in the meantime, this is handy for some blackish box testing
Subscribe(ctx context.Context, subscriber PrivateTxEventSubscriber)

NotifyFailedPublicTx(ctx context.Context, dbTX *gorm.DB, confirms []*PublicTxMatch) error
NotifyFailedPublicTx(ctx context.Context, dbTX *gorm.DB, confirms []*PublicTxMatch) (func(), error)

PrivateTransactionConfirmed(ctx context.Context, receipt *TxCompletion)

Expand Down
4 changes: 2 additions & 2 deletions core/go/internal/components/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type StateManager interface {

// MUST NOT be called for states received over a network from another node.
// Writes a batch of states that have been pre-verified BY THIS NODE so can bypass domain hash verification.
WritePreVerifiedStates(ctx context.Context, dbTX *gorm.DB, domainName string, states []*StateUpsertOutsideContext) ([]*pldapi.State, error)
WritePreVerifiedStates(ctx context.Context, dbTX *gorm.DB, domainName string, states []*StateUpsertOutsideContext) (func(), []*pldapi.State, error)

// Write a batch of states that have been received over the network. ID hash calculation will be validated by the domain as prior to storage
WriteReceivedStates(ctx context.Context, dbTX *gorm.DB, domainName string, states []*StateUpsertOutsideContext) ([]*pldapi.State, error)
WriteReceivedStates(ctx context.Context, dbTX *gorm.DB, domainName string, states []*StateUpsertOutsideContext) (func(), []*pldapi.State, error)

// Write a batch of nullifiers that correspond to states just received
WriteNullifiersForReceivedStates(ctx context.Context, dbTX *gorm.DB, domainName string, nullifiers []*NullifierUpsert) error
Expand Down
20 changes: 18 additions & 2 deletions core/go/internal/components/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,20 @@ type ResolvedFunction struct {
Signature string `json:"signature"`
}

type ReceiptReceiver interface {
DeliverReceiptBatch(ctx context.Context, batchID uint64, receipts []*pldapi.TransactionReceiptFull) error
}

type ReceiptReceiverCloser interface {
Close()
}

type TXManager interface {
ManagerLifecycle

// These are the general purpose functions exposed also as JSON/RPC APIs on the TX Manager

FinalizeTransactions(ctx context.Context, dbTX *gorm.DB, info []*ReceiptInput) error // requires all transactions to be known
FinalizeTransactions(ctx context.Context, dbTX *gorm.DB, info []*ReceiptInput) (postCommit func(), err error) // requires all transactions to be known
CalculateRevertError(ctx context.Context, dbTX *gorm.DB, revertData tktypes.HexBytes) error
DecodeRevertError(ctx context.Context, dbTX *gorm.DB, revertData tktypes.HexBytes, dataFormat tktypes.JSONFormatOptions) (*pldapi.ABIDecodedData, error)
DecodeCall(ctx context.Context, dbTX *gorm.DB, callData tktypes.HexBytes, dataFormat tktypes.JSONFormatOptions) (*pldapi.ABIDecodedData, error)
Expand All @@ -105,9 +113,17 @@ type TXManager interface {
QueryPreparedTransactionsWithRefs(ctx context.Context, dbTX *gorm.DB, jq *query.QueryJSON) ([]*PreparedTransactionWithRefs, error)
CallTransaction(ctx context.Context, result any, tx *pldapi.TransactionCall) (err error)
UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (func(), *pldapi.StoredABI, error)
CreateReceiptListener(ctx context.Context, spec *pldapi.TransactionReceiptListener) error
GetReceiptListener(ctx context.Context, name string) *pldapi.TransactionReceiptListener
QueryReceiptListeners(ctx context.Context, dbTX *gorm.DB, jq *query.QueryJSON) ([]*pldapi.TransactionReceiptListener, error)
StartReceiptListener(ctx context.Context, name string) error
StopReceiptListener(ctx context.Context, name string) error
DeleteReceiptListener(ctx context.Context, name string) error
AddReceiptReceiver(ctx context.Context, name string, r ReceiptReceiver) (ReceiptReceiverCloser, error)

// These functions for use of the private TX manager for chaining private transactions.
// These functions for use of other components

NotifyStatesDBChanged() // called by state manager after committing DB TXs writing new states that might fill in gaps
PrepareInternalPrivateTransaction(ctx context.Context, dbTX *gorm.DB, tx *pldapi.TransactionInput, submitMode pldapi.SubmitMode) (func(), *ValidatedTransaction, error)
UpsertInternalPrivateTxsFinalizeIDs(ctx context.Context, dbTX *gorm.DB, txis []*ValidatedTransaction) (postCommit func(), err error)
WritePreparedTransactions(ctx context.Context, dbTX *gorm.DB, prepared []*PreparedTransactionWithRefs) (postCommit func(), err error)
Expand Down
Loading
Loading