Skip to content

Commit

Permalink
feat(server/v2/cometbft,stf): Listener integration in server/v2 (back…
Browse files Browse the repository at this point in the history
…port #21917) (#22051)

Co-authored-by: cool-developer <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored Oct 3, 2024
1 parent ce10b1f commit 1ae81c5
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
7 changes: 7 additions & 0 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/log"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/client/grpc/cmtservice"
"cosmossdk.io/server/v2/cometbft/handlers"
Expand All @@ -40,6 +41,7 @@ type Consensus[T transaction.Tx] struct {
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]

Expand Down Expand Up @@ -104,6 +106,11 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/depinject v1.0.0 // indirect
cosmossdk.io/math v1.3.0 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/store v1.1.1-0.20240909133312-50288938d1b6 // indirect
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
Expand Down
50 changes: 50 additions & 0 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
)

Expand Down Expand Up @@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges(
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}

if c.listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}

return nil
}

Expand Down

0 comments on commit 1ae81c5

Please sign in to comment.