Skip to content

Commit

Permalink
feat: support profiling block replay during abci handshake (backport c…
Browse files Browse the repository at this point in the history
…osmos#14953) (cosmos#14964)

Co-authored-by: yihuang <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
3 people authored and JeancarloBarrios committed Sep 28, 2024
1 parent 7e35548 commit 6cb128c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 138 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Improvements

* (cli) [#14953](https://github.com/cosmos/cosmos-sdk/pull/14953) Enable profiling block replay during abci handshake with `--cpu-profile`.

## [v0.46.9](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.9) - 2022-02-07

### Improvements
Expand Down
186 changes: 48 additions & 138 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,18 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
return err
}

withCMT, _ := cmd.Flags().GetBool(flagWithComet)
if !withCMT {
serverCtx.Logger.Info("starting ABCI without CometBFT")
withTM, _ := cmd.Flags().GetBool(flagWithTendermint)
if !withTM {
serverCtx.Logger.Info("starting ABCI without Tendermint")
return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, appCreator)
})
}

// amino is needed here for backwards compatibility of REST routes
err = startInProcess(serverCtx, clientCtx, appCreator)
err = wrapCPUProfile(serverCtx, func() error {
return startInProcess(serverCtx, clientCtx, appCreator)
})
errCode, ok := err.(ErrorCode)
if !ok {
return err
Expand Down Expand Up @@ -270,68 +275,6 @@ func startStandAlone[T types.Application](svrCtx *Context, svrCfg serverconfig.C
func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := ctx.Config
home := cfg.RootDir
var cpuProfileCleanup func()

err = startAPIServer(ctx, g, svrCfg, clientCtx, svrCtx, app, svrCtx.Config.RootDir, grpcSrv, metrics)
if err != nil {
return err
}

if opts.PostSetupStandalone != nil {
if err := opts.PostSetupStandalone(app, svrCtx, clientCtx, ctx, g); err != nil {
return err
}
}

g.Go(func() error {
if err := svr.Start(); err != nil {
svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err)
return err
}

// Wait for the calling process to be canceled or close the provided context,
// so we can gracefully stop the ABCI server.
<-ctx.Done()
svrCtx.Logger.Info("stopping the ABCI server...")
return svr.Stop()
})

return g.Wait()
}

func startInProcess[T types.Application](svrCtx *Context, svrCfg serverconfig.Config, clientCtx client.Context, app T,
metrics *telemetry.Metrics, opts StartCmdOptions[T],
) error {
cmtCfg := svrCtx.Config
gRPCOnly := svrCtx.Viper.GetBool(flagGRPCOnly)

g, ctx := getCtx(svrCtx, true)

if gRPCOnly {
// TODO: Generalize logic so that gRPC only is really in startStandAlone
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
svrCfg.GRPC.Enable = true
} else {
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
tmNode, cleanupFn, err := startCmtNode(ctx, cmtCfg, app, svrCtx)
if err != nil {
return err
}
defer cleanupFn()

// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
if svrCfg.API.Enable || svrCfg.GRPC.Enable {
// Re-assign for making the client available below do not use := to avoid
// shadowing the clientCtx variable.
clientCtx = clientCtx.WithClient(local.New(tmNode))

app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, svrCfg)
}
}

grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
Expand Down Expand Up @@ -595,90 +538,57 @@ func startApp[T types.Application](svrCtx *Context, appCreator types.AppCreator[
_ = tmNode.Stop()
}

// testnetify modifies both state and blockStore, allowing the provided operator address and local validator key to control the network
// that the state in the data folder represents. The chainID of the local genesis file is modified to match the provided chainID.
func testnetify[T types.Application](ctx *Context, testnetAppCreator types.AppCreator[T], db corestore.KVStoreWithBatch, traceWriter io.WriteCloser) (*T, error) {
config := ctx.Config

newChainID, ok := ctx.Viper.Get(KeyNewChainID).(string)
if !ok {
return nil, fmt.Errorf("expected string for key %s", KeyNewChainID)
}
if apiSrv != nil {
_ = apiSrv.Close()
}

// Modify app genesis chain ID and save to genesis file.
genFilePath := config.GenesisFile()
appGen, err := genutiltypes.AppGenesisFromFile(genFilePath)
if err != nil {
return nil, err
}
appGen.ChainID = newChainID
if err := appGen.ValidateAndComplete(); err != nil {
return nil, err
}
if err := appGen.SaveAs(genFilePath); err != nil {
return nil, err
}
ctx.Logger.Info("exiting...")
}()

// Regenerate addrbook.json to prevent peers on old network from causing error logs.
addrBookPath := filepath.Join(config.RootDir, "config", "addrbook.json")
if err := os.Remove(addrBookPath); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to remove existing addrbook.json: %w", err)
}
// wait for signal capture and gracefully return
return WaitForQuitSignals()
}

emptyAddrBook := []byte("{}")
if err := os.WriteFile(addrBookPath, emptyAddrBook, 0o600); err != nil {
return nil, fmt.Errorf("failed to create empty addrbook.json: %w", err)
func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
}
return telemetry.New(cfg.Telemetry)
}

// Load the comet genesis doc provider.
genDocProvider := node.DefaultGenesisDocProviderFunc(config)
// wrapCPUProfile runs callback in a goroutine, then wait for quit signals.
func wrapCPUProfile(ctx *Context, callback func() error) error {
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
return err
}

// Initialize blockStore and stateDB.
blockStoreDB, err := cmtcfg.DefaultDBProvider(&cmtcfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return nil, err
}
blockStore := store.NewBlockStore(blockStoreDB)
ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile)
if err := pprof.StartCPUProfile(f); err != nil {
return err
}

stateDB, err := cmtcfg.DefaultDBProvider(&cmtcfg.DBContext{ID: "state", Config: config})
if err != nil {
return nil, err
defer func() {
ctx.Logger.Info("stopping CPU profiler", "profile", cpuProfile)
pprof.StopCPUProfile()
if err := f.Close(); err != nil {
ctx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error())
}
}()
}

defer blockStore.Close()
defer stateDB.Close()

privValidator, err := pvm.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile(), func() (cmtcrypto.PrivKey, error) {
return cmted25519.GenPrivKey(), nil
}) // TODO: make this modular
if err != nil {
return nil, err
}
userPubKey, err := privValidator.GetPubKey()
if err != nil {
return nil, err
}
validatorAddress := userPubKey.Address()
errCh := make(chan error)
go func() {
errCh <- callback()
}()

stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})
select {
case err := <-errCh:
return err

state, genDoc, err := node.LoadStateFromDBOrGenesisDocProvider(stateDB, genDocProvider, "")
if err != nil {
return nil, err
case <-time.After(types.ServerStartTime):
}

ctx.Logger.Info("exiting...")
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals()
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
}
return telemetry.New(cfg.Telemetry)
}

0 comments on commit 6cb128c

Please sign in to comment.