Skip to content

Commit

Permalink
feat(server/v2/stf): delayed marshalling of typed event (#22684)
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt authored Dec 4, 2024
1 parent 330954e commit 7fa2356
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 69 deletions.
28 changes: 21 additions & 7 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
chainID string
indexedEvents map[string]struct{}
cfg Config
chainID string
indexedABCIEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand Down Expand Up @@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
return nil, err
}

events, err := intoABCIEvents(resp.Events, c.indexedEvents)
if err != nil {
return nil, err
events := make([]abci.Event, 0)
if !c.cfg.AppTomlConfig.DisableABCIEvents {
events, err = intoABCIEvents(
resp.Events,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

cometResp := &abciproto.CheckTxResponse{
Expand All @@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
GasUsed: uint64ToInt64(resp.GasUsed),
Events: events,
}

if resp.Error != nil {
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
cometResp.Code = code
Expand Down Expand Up @@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
return finalizeBlockResponse(
resp,
cp,
appHash,
c.indexedABCIEvents,
c.cfg.AppTomlConfig,
)
}

func (c *consensus[T]) internalFinalizeBlock(
Expand Down
26 changes: 15 additions & 11 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
func DefaultAppTomlConfig() *AppTomlConfig {
return &AppTomlConfig{
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
HaltHeight: 0,
HaltTime: 0,
Address: "tcp://127.0.0.1:26658",
Expand All @@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
IndexABCIEvents: make([]string, 0),
DisableIndexABCIEvents: false,
DisableABCIEvents: false,
}
}

type AppTomlConfig struct {
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."`
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
6 changes: 5 additions & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
return nil, errorsmod.Wrap(err, "failed to simulate tx")
}

bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
bz, err := intoABCISimulationResponse(
txResult,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
}
Expand Down
8 changes: 4 additions & 4 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func New[T transaction.Tx](
}
}

indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
for _, e := range srv.config.AppTomlConfig.IndexEvents {
indexEvents[e] = struct{}{}
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
indexedABCIEvents[e] = struct{}{}
}

sc := store.GetStateCommitment().(snapshots.CommitSnapshotter)
Expand Down Expand Up @@ -183,7 +183,7 @@ func New[T transaction.Tx](
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
indexedABCIEvents: indexedABCIEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
Expand Down
77 changes: 39 additions & 38 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,23 @@ func finalizeBlockResponse(
cp *cmtproto.ConsensusParams,
appHash []byte,
indexSet map[string]struct{},
debug bool,
cfg *AppTomlConfig,
) (*abci.FinalizeBlockResponse, error) {
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)

events, err := intoABCIEvents(allEvents, indexSet)
if err != nil {
return nil, err
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
var err error
events, err = intoABCIEvents(
append(in.BeginBlockEvents, in.EndBlockEvents...),
indexSet,
cfg.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +98,7 @@ func finalizeBlockResponse(
AppHash: appHash,
ConsensusParamUpdates: cp,
}

return resp, nil
}

Expand All @@ -108,72 +116,65 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
return valsetUpdates
}

func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
func intoABCITxResults(
results []server.TxResult,
indexSet map[string]struct{},
cfg *AppTomlConfig,
) ([]*abci.ExecTxResult, error) {
res := make([]*abci.ExecTxResult, len(results))
for i := range results {
events, err := intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
var err error
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents)
if err != nil {
return nil, err
}
}

res[i] = responseExecTxResultWithEvents(
results[i].Error,
results[i].GasWanted,
results[i].GasUsed,
events,
debug,
cfg.Trace,
)
}

return res, nil
}

func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) {
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(events))
for i, e := range events {
attributes, err := e.Attributes()
attrs, err := e.Attributes()
if err != nil {
return nil, err
}

abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
Attributes: make([]abci.EventAttribute, len(attrs)),
}

for j, attr := range attributes {
for j, attr := range attrs {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
Index: !indexNone && (index || indexAll),
}
}
}
return abciEvents, nil
}

func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(txRes.Events))
for i, e := range txRes.Events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
}
abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
}

for j, attr := range attributes {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
}
}
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) {
abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone)
if err != nil {
return nil, err
}

msgResponses := make([]*gogoany.Any, len(txRes.Resp))
Expand Down
36 changes: 31 additions & 5 deletions server/v2/stf/core_event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,44 @@ type eventManager struct {
// Emit emits an typed event that is defined in the protobuf file.
// In the future these events will be added to consensus.
func (em *eventManager) Emit(tev transaction.Msg) error {
res, err := TypedEventToEvent(tev)
if err != nil {
return err
ev := event.Event{
Type: gogoproto.MessageName(tev),
Attributes: func() ([]event.Attribute, error) {
outerEvent, err := TypedEventToEvent(tev)
if err != nil {
return nil, err
}

return outerEvent.Attributes()
},
Data: func() (json.RawMessage, error) {
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return nil, err
}

return buf.Bytes(), nil
},
}

em.executionContext.events = append(em.executionContext.events, res)
em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

// EmitKV emits a key value pair event.
func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error {
em.executionContext.events = append(em.executionContext.events, event.NewEvent(eventType, attrs...))
ev := event.Event{
Type: eventType,
Attributes: func() ([]event.Attribute, error) {
return attrs, nil
},
Data: func() (json.RawMessage, error) {
return json.Marshal(attrs)
},
}

em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
[comet]
# min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned.
min-retain-blocks = 0
# index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
index-events = []
# halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
halt-height = 0
# halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing.
Expand All @@ -15,6 +13,12 @@ transport = 'socket'
trace = false
# standalone starts the application without the CometBFT node. The node should be started separately.
standalone = false
# index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed.
index-abci-events = []
# disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse.
disable-index-abci-events = false
# disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing.
disable-abci-events = false

# mempool defines the configuration for the SDK built-in app-side mempool implementations.
[comet.mempool]
Expand Down
2 changes: 1 addition & 1 deletion tools/confix/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type v2KeyChangesMap map[string][]string
var v2KeyChanges = v2KeyChangesMap{
"minimum-gas-prices": []string{"server.minimum-gas-prices"},
"min-retain-blocks": []string{"comet.min-retain-blocks"},
"index-events": []string{"comet.index-events"},
"index-events": []string{"comet.index-abci-events"},
"halt-height": []string{"comet.halt-height"},
"halt-time": []string{"comet.halt-time"},
"app-db-backend": []string{"store.app-db-backend"},
Expand Down

0 comments on commit 7fa2356

Please sign in to comment.