Skip to content

Commit

Permalink
Problem: abci handshake is not shutdown gracefully (#288)
Browse files Browse the repository at this point in the history
* Problem: abci handshake is not shutdown gracefully

Solution:
- use cancellable node constructor

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* fix grpc-only

* better app close

* log error

---------

Signed-off-by: yihuang <[email protected]>
  • Loading branch information
yihuang authored Jun 23, 2023
1 parent 2136ad0 commit 4774460
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (ante) [#227](https://github.com/crypto-org-chain/ethermint/pull/227) Reuse sender recovery result.
* (cli) [#242](https://github.com/crypto-org-chain/ethermint/pull/242) Integrate tendermint bootstrap cmd.
* (cli) [#246](https://github.com/crypto-org-chain/ethermint/pull/246) Call app.Close to cleanup resource on graceful shutdown.
* (cli) [#288](https://github.com/crypto-org-chain/ethermint/pull/288) make abci handshake shutdown gracefully.

### State Machine Breaking

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/tendermint/tm-db v0.6.7
github.com/tyler-smith/go-bip39 v1.1.0
golang.org/x/net v0.9.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.9.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.54.0
Expand Down Expand Up @@ -185,7 +186,6 @@ require (
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand All @@ -210,7 +210,7 @@ replace (
github.com/miguelmota/go-ethereum-hdwallet => github.com/crypto-org-chain/go-ethereum-hdwallet v0.1.2
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
// use cometbft
github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.28
github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.30-0.20230622094628-60e431e4eef0
// https://github.com/crypto-org-chain/tm-db/tree/release/v0.6.x
github.com/tendermint/tm-db => github.com/crypto-org-chain/tm-db v0.6.8-0.20230424032152-87c7e7f4fb61
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcju
github.com/coinbase/kryptology v1.8.0/go.mod h1:RYXOAPdzOGUe3qlSFkMGn58i3xUA8hmxYHksuq+8ciI=
github.com/coinbase/rosetta-sdk-go v0.7.9 h1:lqllBjMnazTjIqYrOGv8h8jxjg9+hJazIGZr9ZvoCcA=
github.com/coinbase/rosetta-sdk-go v0.7.9/go.mod h1:0/knutI7XGVqXmmH4OQD8OckFrbQ8yMsUZTG7FXCR2M=
github.com/cometbft/cometbft v0.34.28 h1:gwryf55P1SWMUP4nOXpRVI2D0yPoYEzN+IBqmRBOsDc=
github.com/cometbft/cometbft v0.34.28/go.mod h1:L9shMfbkZ8B+7JlwANEr+NZbBcn+hBpwdbeYvA5rLCw=
github.com/cometbft/cometbft v0.34.30-0.20230622094628-60e431e4eef0 h1:AA1aC/2OGCqbnLCj/GOMRhLAcVjBLwduxrDH3k+Ulyk=
github.com/cometbft/cometbft v0.34.30-0.20230622094628-60e431e4eef0/go.mod h1:L9shMfbkZ8B+7JlwANEr+NZbBcn+hBpwdbeYvA5rLCw=
github.com/cometbft/cometbft-db v0.8.0 h1:vUMDaH3ApkX8m0KZvOFFy9b5DZHBAjsnEuo9AKVZpjo=
github.com/cometbft/cometbft-db v0.8.0/go.mod h1:6ASCP4pfhmrCBpfk01/9E1SI29nD3HfVHrY4PG8x5c0=
github.com/confio/ics23/go v0.9.0 h1:cWs+wdbS2KRPZezoaaj+qBleXgUk5WOQFMP3CQFGTr4=
Expand Down
4 changes: 2 additions & 2 deletions gomod2nix.toml
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ schema = 3
version = "v0.16.0"
hash = "sha256-JW4zO/0vMzf1dXLePOqaMtiLUZgNbuIseh9GV+jQlf0="
[mod."github.com/tendermint/tendermint"]
version = "v0.34.28"
hash = "sha256-JcV8tXlEpNSjQS34mjfB2K2LZ9AjrE6bUoUGogGK/DE="
version = "v0.34.30-0.20230622094628-60e431e4eef0"
hash = "sha256-iJUYE0kXAq6/b8JuUYAkhz6zuVDJNoHL+nJuVz2xz58="
replaced = "github.com/cometbft/cometbft"
[mod."github.com/tendermint/tm-db"]
version = "v0.6.8-0.20230424032152-87c7e7f4fb61"
Expand Down
85 changes: 43 additions & 42 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/telemetry"
"golang.org/x/sync/errgroup"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -234,6 +235,11 @@ func startStandAlone(ctx *server.Context, opts StartOptions) error {
}

app := opts.AppCreator(ctx.Logger, db, traceWriter, ctx.Viper)
defer func() {
if err := app.Close(); err != nil {
ctx.Logger.Error("close application failed", "error", err.Error())
}
}()

config, err := config.GetConfig(ctx.Viper)
if err != nil {
Expand Down Expand Up @@ -261,31 +267,27 @@ func startStandAlone(ctx *server.Context, opts StartOptions) error {
if err = svr.Stop(); err != nil {
tmos.Exit(err.Error())
}

if err := app.Close(); err != nil {
tmos.Exit(err.Error())
}
}()

// Wait for SIGINT or SIGTERM signal
return server.WaitForQuitSignals()
}

// legacyAminoCdc is used for the legacy REST API
func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOptions) (err error) {
cfg := ctx.Config
func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts StartOptions) (err error) {
cfg := svrCtx.Config
home := cfg.RootDir
logger := ctx.Logger
logger := svrCtx.Logger

traceWriterFile := ctx.Viper.GetString(srvflags.TraceStore)
db, err := opts.DBOpener(home, server.GetAppDBBackend(ctx.Viper))
traceWriterFile := svrCtx.Viper.GetString(srvflags.TraceStore)
db, err := opts.DBOpener(home, server.GetAppDBBackend(svrCtx.Viper))
if err != nil {
logger.Error("failed to open DB", "error", err.Error())
return err
}
defer func() {
if err := db.Close(); err != nil {
ctx.Logger.With("error", err).Error("error closing db")
svrCtx.Logger.With("error", err).Error("error closing db")
}
}()

Expand All @@ -295,15 +297,15 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
return err
}

config, err := config.GetConfig(ctx.Viper)
config, err := config.GetConfig(svrCtx.Viper)
if err != nil {
logger.Error("failed to get server config", "error", err.Error())
return err
}

if err := config.ValidateBasic(); err != nil {
if strings.Contains(err.Error(), "set min gas price in app.toml or flag or env variable") {
ctx.Logger.Error(
svrCtx.Logger.Error(
"WARNING: The minimum-gas-prices config in app.toml is set to the empty string. " +
"This defaults to 0 in the current version, but will error in the next version " +
"(SDK v0.44). Please explicitly put the desired minimum-gas-prices in your app.toml.",
Expand All @@ -313,33 +315,45 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
}
}

app := opts.AppCreator(ctx.Logger, db, traceWriter, ctx.Viper)
app := opts.AppCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
defer func() {
if err := app.Close(); err != nil {
logger.Error("close application failed", "error", err.Error())
}
}()

nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
if err != nil {
logger.Error("failed load or gen node key", "error", err.Error())
return err
}

ctx, cancelFn := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(g, true, cancelFn, svrCtx.Logger)

genDocProvider := node.DefaultGenesisDocProviderFunc(cfg)
var (
tmNode *node.Node
gRPCOnly = ctx.Viper.GetBool(srvflags.GRPCOnly)
gRPCOnly = svrCtx.Viper.GetBool(srvflags.GRPCOnly)
)
if gRPCOnly {
ctx.Logger.Info("starting node in query only mode; Tendermint is disabled")
svrCtx.Logger.Info("starting node in query only mode; Tendermint is disabled")
config.GRPC.Enable = true
config.JSONRPC.EnableIndexer = false
} else {
tmNode, err = node.NewNode(
tmNode, err = node.NewNodeWithContext(
ctx,
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
genDocProvider,
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
ctx.Logger.With("server", "node"),
svrCtx.Logger.With("server", "node"),
)
if err != nil {
logger.Error("failed init node", "error", err.Error())
Expand All @@ -354,7 +368,6 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
defer func() {
if tmNode.IsRunning() {
_ = tmNode.Stop()
_ = app.Close()
}
logger.Info("Bye!")
}()
Expand All @@ -372,18 +385,18 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt

// Enable metrics if JSONRPC is enabled and --metrics is passed
// Flag not added in config to avoid user enabling in config without passing in CLI
if config.JSONRPC.Enable && ctx.Viper.GetBool(srvflags.JSONRPCEnableMetrics) {
if config.JSONRPC.Enable && svrCtx.Viper.GetBool(srvflags.JSONRPCEnableMetrics) {
ethmetricsexp.Setup(config.JSONRPC.MetricsAddress)
}

var idxer ethermint.EVMTxIndexer
if config.JSONRPC.EnableIndexer {
idxDB, err := OpenIndexerDB(home, server.GetAppDBBackend(ctx.Viper))
idxDB, err := OpenIndexerDB(home, server.GetAppDBBackend(svrCtx.Viper))
if err != nil {
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}
idxLogger := ctx.Logger.With("module", "evmindex")
idxLogger := svrCtx.Logger.With("module", "evmindex")
idxer = indexer.NewKVIndexer(idxDB, idxLogger, clientCtx)
indexerService := NewEVMIndexerService(idxer, clientCtx.Client)
indexerService.SetLogger(idxLogger)
Expand Down Expand Up @@ -447,7 +460,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("gRPC client assigned to client context", "address", grpcAddress)
svrCtx.Logger.Debug("gRPC client assigned to client context", "address", grpcAddress)
}
}

Expand All @@ -458,7 +471,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt

var apiSrv *api.Server
if config.API.Enable {
apiSrv = api.New(clientCtx, ctx.Logger.With("server", "api"))
apiSrv = api.New(clientCtx, svrCtx.Logger.With("server", "api"))
app.RegisterAPIRoutes(apiSrv, config.API)
if config.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
Expand Down Expand Up @@ -491,7 +504,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
if config.GRPCWeb.Enable {
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config.Config)
if err != nil {
ctx.Logger.Error("failed to start grpc-web http server", "error", err)
svrCtx.Logger.Error("failed to start grpc-web http server", "error", err)
return err
}
defer func() {
Expand All @@ -517,7 +530,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt

tmEndpoint := "/websocket"
tmRPCAddr := cfg.RPC.ListenAddress
httpSrv, httpSrvDone, err = StartJSONRPC(ctx, clientCtx, tmRPCAddr, tmEndpoint, &config, idxer)
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, tmRPCAddr, tmEndpoint, &config, idxer)
if err != nil {
return err
}
Expand All @@ -540,7 +553,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
// we do not need to start Rosetta or handle any Tendermint related processes.
if gRPCOnly {
// wait for signal capture and gracefully return
return server.WaitForQuitSignals()
return g.Wait()
}

var rosettaSrv crgserver.Server
Expand All @@ -553,7 +566,7 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
conf := &rosetta.Config{
Blockchain: config.Rosetta.Blockchain,
Network: config.Rosetta.Network,
TendermintRPC: ctx.Config.RPC.ListenAddress,
TendermintRPC: svrCtx.Config.RPC.ListenAddress,
GRPCEndpoint: config.GRPC.Address,
Addr: config.Rosetta.Address,
Retries: config.Rosetta.Retries,
Expand All @@ -578,8 +591,8 @@ func startInProcess(ctx *server.Context, clientCtx client.Context, opts StartOpt
case <-time.After(types.ServerStartTime): // assume server started successfully
}
}
// Wait for SIGINT or SIGTERM signal
return server.WaitForQuitSignals()

return g.Wait()
}

func openDB(rootDir string, backendType dbm.BackendType) (dbm.DB, error) {
Expand Down Expand Up @@ -640,17 +653,5 @@ func wrapCPUProfile(ctx *server.Context, callback func() error) error {
}()
}

errCh := make(chan error)
go func() {
errCh <- callback()
}()

select {
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime):
}

return server.WaitForQuitSignals()
return callback()
}
32 changes: 32 additions & 0 deletions server/util.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package server

import (
"context"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/evmos/ethermint/server/config"
"github.com/gorilla/mux"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/spf13/cobra"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"

sdkserver "github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/server/types"
Expand Down Expand Up @@ -124,3 +129,30 @@ func Listen(addr string, config *config.Config) (net.Listener, error) {
}
return ln, err
}

// ListenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received,
// the cleanup function is called, indicating the caller can gracefully exit or
// return.
//
// Note, the blocking behavior of this depends on the block argument.
// The caller must ensure the corresponding context derived from the cancelFn is used correctly.
func ListenForQuitSignals(g *errgroup.Group, block bool, cancelFn context.CancelFunc, logger tmlog.Logger) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

f := func() {
sig := <-sigCh
cancelFn()

logger.Info("caught signal", "signal", sig.String())
}

if block {
g.Go(func() error {
f()
return nil
})
} else {
go f()
}
}

0 comments on commit 4774460

Please sign in to comment.