From 8542c0a600778e875d8bbad076a1d6f527ed7734 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 13 Jun 2023 14:50:49 +0800 Subject: [PATCH] feat(flags): add retry related flags --- cmd/flags/common.go | 14 ++++++++++++++ driver/config.go | 2 ++ driver/driver.go | 17 +++++++++-------- pkg/chain_iterator/block_batch_iterator.go | 11 ++++++++++- pkg/rpc/client.go | 15 +++++++++++---- pkg/rpc/client_test.go | 2 ++ pkg/rpc/dial.go | 13 +++++++++---- pkg/rpc/dial_test.go | 8 +++++++- proposer/config.go | 2 ++ proposer/proposer.go | 1 + prover/config.go | 4 ++++ prover/proof_producer/zkevm_rpcd_producer.go | 5 +++-- prover/proof_submitter/util.go | 3 ++- prover/proof_submitter/util_test.go | 11 +++++++---- prover/proof_submitter/valid_proof_submitter.go | 5 +++++ .../valid_proof_submitter_test.go | 1 + prover/prover.go | 13 +++++-------- testutils/suite.go | 2 ++ 18 files changed, 96 insertions(+), 33 deletions(-) diff --git a/cmd/flags/common.go b/cmd/flags/common.go index d9e3f4777..a253ec99f 100644 --- a/cmd/flags/common.go +++ b/cmd/flags/common.go @@ -83,6 +83,18 @@ var ( Category: metricsCategory, Value: 6060, } + BackOffMaxRetrys = &cli.Uint64Flag{ + Name: "backoff.maxRetrys", + Usage: "Max retry times when there is an error", + Category: commonCategory, + Value: 10, + } + BackOffRetryInterval = &cli.Uint64Flag{ + Name: "backoff.retryInterval", + Usage: "Retry interval in seconds when there is an error", + Category: commonCategory, + Value: 12, + } ) // All common flags. @@ -97,6 +109,8 @@ var CommonFlags = []cli.Flag{ MetricsEnabled, MetricsAddr, MetricsPort, + BackOffMaxRetrys, + BackOffRetryInterval, } // MergeFlags merges the given flag slices. diff --git a/driver/config.go b/driver/config.go index 79098f381..363dd8300 100644 --- a/driver/config.go +++ b/driver/config.go @@ -22,6 +22,7 @@ type Config struct { JwtSecret string P2PSyncVerifiedBlocks bool P2PSyncTimeout time.Duration + BackOffRetryInterval time.Duration } // NewConfigFromCliContext creates a new config instance from @@ -51,5 +52,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { JwtSecret: string(jwtSecret), P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks, P2PSyncTimeout: time.Duration(int64(time.Second) * int64(c.Uint(flags.P2PSyncTimeout.Name))), + BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second, }, nil } diff --git a/driver/driver.go b/driver/driver.go index d4e771d73..eabe3ff48 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -16,8 +16,7 @@ import ( ) const ( - // Time to wait before the next try, when receiving subscription errors. - RetryDelay = 10 * time.Second + protocolStatusReportInterval = 30 * time.Second ) // Driver keeps the L2 execution engine's local block chain in sync with the TaikoL1 @@ -31,8 +30,9 @@ type Driver struct { l1HeadSub event.Subscription syncNotify chan struct{} - ctx context.Context - wg sync.WaitGroup + backOffRetryInterval time.Duration + ctx context.Context + wg sync.WaitGroup } // New initializes the given driver instance based on the command line flags. @@ -51,6 +51,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) { d.wg = sync.WaitGroup{} d.syncNotify = make(chan struct{}, 1) d.ctx = ctx + d.backOffRetryInterval = cfg.BackOffRetryInterval if d.rpc, err = rpc.NewClient(d.ctx, &rpc.ClientConfig{ L1Endpoint: cfg.L1Endpoint, @@ -60,6 +61,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) { TaikoL2Address: cfg.TaikoL2Address, L2EngineEndpoint: cfg.L2EngineEndpoint, JwtSecret: cfg.JwtSecret, + RetryInterval: cfg.BackOffRetryInterval, }); err != nil { return err } @@ -116,7 +118,6 @@ func (d *Driver) Close() { // eventLoop starts the main loop of a L2 execution engine's driver. func (d *Driver) eventLoop() { defer d.wg.Done() - constatnBackoff := backoff.NewConstantBackOff(12 * time.Second) // reqSync requests performing a synchronising operation, won't block // if we are already synchronising. @@ -129,7 +130,7 @@ func (d *Driver) eventLoop() { // doSyncWithBackoff performs a synchronising operation with a backoff strategy. doSyncWithBackoff := func() { - if err := backoff.Retry(d.doSync, constatnBackoff); err != nil { + if err := backoff.Retry(d.doSync, backoff.NewConstantBackOff(d.backOffRetryInterval)); err != nil { log.Error("Sync L2 execution engine's block chain error", "error", err) } } @@ -176,7 +177,7 @@ func (d *Driver) ChainSyncer() *chainSyncer.L2ChainSyncer { // reportProtocolStatus reports some protocol status intervally. func (d *Driver) reportProtocolStatus() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(protocolStatusReportInterval) defer func() { ticker.Stop() d.wg.Done() @@ -193,7 +194,7 @@ func (d *Driver) reportProtocolStatus() { maxNumBlocks = configs.MaxNumProposedBlocks.Uint64() return nil }, - backoff.NewConstantBackOff(RetryDelay), + backoff.NewConstantBackOff(d.backOffRetryInterval), ); err != nil { log.Error("Failed to get protocol state variables", "error", err) return diff --git a/pkg/chain_iterator/block_batch_iterator.go b/pkg/chain_iterator/block_batch_iterator.go index 9bebdecbf..0203f2853 100644 --- a/pkg/chain_iterator/block_batch_iterator.go +++ b/pkg/chain_iterator/block_batch_iterator.go @@ -17,6 +17,7 @@ import ( const ( DefaultBlocksReadPerEpoch = 1000 + DefaultRetryInterval = 12 * time.Second ) var ( @@ -52,6 +53,7 @@ type BlockBatchIterator struct { isEnd bool reverse bool reorgRewindDepth uint64 + retryInterval time.Duration } // BlockBatchIteratorConfig represents the configs of a block batch iterator. @@ -63,6 +65,7 @@ type BlockBatchIteratorConfig struct { OnBlocks OnBlocksFunc Reverse bool ReorgRewindDepth *uint64 + RetryInterval *time.Duration } // NewBlockBatchIterator creates a new block batch iterator instance. @@ -123,6 +126,12 @@ func NewBlockBatchIterator(ctx context.Context, cfg *BlockBatchIteratorConfig) ( iterator.blocksReadPerEpoch = DefaultBlocksReadPerEpoch } + if cfg.RetryInterval == nil { + iterator.retryInterval = DefaultRetryInterval + } else { + iterator.retryInterval = *cfg.RetryInterval + } + if cfg.EndHeight != nil { endHeightUint64 := cfg.EndHeight.Uint64() iterator.endHeight = &endHeightUint64 @@ -164,7 +173,7 @@ func (i *BlockBatchIterator) Iter() error { return nil } - if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(12*time.Second)); err != nil { + if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(i.retryInterval)); err != nil { return err } diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 1c3c5c30b..09930f2cc 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -3,6 +3,7 @@ package rpc import ( "context" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -44,11 +45,12 @@ type ClientConfig struct { TaikoL2Address common.Address L2EngineEndpoint string JwtSecret string + RetryInterval time.Duration } // NewClient initializes all RPC clients used by Taiko client softwares. func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { - l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint) + l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval) if err != nil { return nil, err } @@ -58,7 +60,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { return nil, err } - l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint) + l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval) if err != nil { return nil, err } @@ -92,14 +94,19 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) { // won't be initialized. var l2AuthRPC *EngineClient if len(cfg.L2EngineEndpoint) != 0 && len(cfg.JwtSecret) != 0 { - if l2AuthRPC, err = DialEngineClientWithBackoff(ctx, cfg.L2EngineEndpoint, cfg.JwtSecret); err != nil { + if l2AuthRPC, err = DialEngineClientWithBackoff( + ctx, + cfg.L2EngineEndpoint, + cfg.JwtSecret, + cfg.RetryInterval, + ); err != nil { return nil, err } } var l2CheckPoint *ethclient.Client if len(cfg.L2CheckPoint) != 0 { - if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint); err != nil { + if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval); err != nil { return nil, err } } diff --git a/pkg/rpc/client_test.go b/pkg/rpc/client_test.go index cd38e8061..97dc3977e 100644 --- a/pkg/rpc/client_test.go +++ b/pkg/rpc/client_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" ) @@ -17,6 +18,7 @@ func newTestClient(t *testing.T) *Client { TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")), L2EngineEndpoint: os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"), JwtSecret: os.Getenv("JWT_SECRET"), + RetryInterval: backoff.DefaultMaxInterval, }) require.Nil(t, err) diff --git a/pkg/rpc/dial.go b/pkg/rpc/dial.go index 4b1e52300..6d3fee5fa 100644 --- a/pkg/rpc/dial.go +++ b/pkg/rpc/dial.go @@ -16,7 +16,7 @@ import ( // DialClientWithBackoff connects a ethereum RPC client at the given URL with // a backoff strategy. -func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client, error) { +func DialClientWithBackoff(ctx context.Context, url string, retryInterval time.Duration) (*ethclient.Client, error) { var client *ethclient.Client if err := backoff.Retry( func() (err error) { @@ -26,7 +26,7 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client, } return err }, - backoff.NewConstantBackOff(12*time.Second), + backoff.NewConstantBackOff(retryInterval), ); err != nil { return nil, err } @@ -36,7 +36,12 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client, // DialEngineClientWithBackoff connects an ethereum engine RPC client at the // given URL with a backoff strategy. -func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret string) (*EngineClient, error) { +func DialEngineClientWithBackoff( + ctx context.Context, + url string, + jwtSecret string, + retryInterval time.Duration, +) (*EngineClient, error) { var engineClient *EngineClient if err := backoff.Retry( func() (err error) { @@ -49,7 +54,7 @@ func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret stri engineClient = &EngineClient{client} return nil }, - backoff.NewConstantBackOff(12*time.Second), + backoff.NewConstantBackOff(retryInterval), ); err != nil { return nil, err } diff --git a/pkg/rpc/dial_test.go b/pkg/rpc/dial_test.go index 8d6ffde2c..7f1f627a6 100644 --- a/pkg/rpc/dial_test.go +++ b/pkg/rpc/dial_test.go @@ -4,6 +4,7 @@ import ( "context" "os" "testing" + "time" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -21,6 +22,7 @@ func TestDialEngineClientWithBackoff(t *testing.T) { context.Background(), os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"), string(jwtSecret), + 12*time.Second, ) require.Nil(t, err) @@ -32,7 +34,11 @@ func TestDialEngineClientWithBackoff(t *testing.T) { } func TestDialClientWithBackoff(t *testing.T) { - client, err := DialClientWithBackoff(context.Background(), os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT")) + client, err := DialClientWithBackoff( + context.Background(), + os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"), + 12*time.Second, + ) require.Nil(t, err) genesis, err := client.HeaderByNumber(context.Background(), common.Big0) diff --git a/proposer/config.go b/proposer/config.go index f75fc0bff..91502d2a3 100644 --- a/proposer/config.go +++ b/proposer/config.go @@ -27,6 +27,7 @@ type Config struct { MinBlockGasLimit uint64 MaxProposedTxListsPerEpoch uint64 ProposeBlockTxGasLimit *uint64 + BackOffRetryInterval time.Duration } // NewConfigFromCliContext initializes a Config instance from @@ -93,5 +94,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name), MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name), ProposeBlockTxGasLimit: proposeBlockTxGasLimit, + BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second, }, nil } diff --git a/proposer/proposer.go b/proposer/proposer.go index c98af1045..26a241102 100644 --- a/proposer/proposer.go +++ b/proposer/proposer.go @@ -91,6 +91,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) { L2Endpoint: cfg.L2Endpoint, TaikoL1Address: cfg.TaikoL1Address, TaikoL2Address: cfg.TaikoL2Address, + RetryInterval: cfg.BackOffRetryInterval, }); err != nil { return fmt.Errorf("initialize rpc clients error: %w", err) } diff --git a/prover/config.go b/prover/config.go index 443931f48..39961150c 100644 --- a/prover/config.go +++ b/prover/config.go @@ -35,6 +35,8 @@ type Config struct { RandomDummyProofDelayLowerBound *time.Duration RandomDummyProofDelayUpperBound *time.Duration ExpectedReward uint64 + BackOffMaxRetrys uint64 + BackOffRetryInterval time.Duration } // NewConfigFromCliContext creates a new config instance from command line flags. @@ -136,5 +138,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { RandomDummyProofDelayLowerBound: randomDummyProofDelayLowerBound, RandomDummyProofDelayUpperBound: randomDummyProofDelayUpperBound, ExpectedReward: c.Uint64(flags.ExpectedReward.Name), + BackOffMaxRetrys: c.Uint64(flags.BackOffMaxRetrys.Name), + BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second, }, nil } diff --git a/prover/proof_producer/zkevm_rpcd_producer.go b/prover/proof_producer/zkevm_rpcd_producer.go index ca000f2ee..3b9013abc 100644 --- a/prover/proof_producer/zkevm_rpcd_producer.go +++ b/prover/proof_producer/zkevm_rpcd_producer.go @@ -19,7 +19,8 @@ import ( ) var ( - errProofGenerating = errors.New("proof is generating") + errProofGenerating = errors.New("proof is generating") + proofPollingInterval = 10 * time.Second ) // ZkevmRpcdProducer is responsible for requesting zk proofs from the given proverd endpoint. @@ -175,7 +176,7 @@ func (p *ZkevmRpcdProducer) callProverDaemon(ctx context.Context, opts *ProofReq degree = output.Circuit.Degree log.Info("Proof generated", "height", opts.Height, "degree", degree, "time", time.Since(start)) return nil - }, backoff.NewConstantBackOff(10*time.Second)); err != nil { + }, backoff.NewConstantBackOff(proofPollingInterval)); err != nil { return nil, 0, err } return proof, degree, nil diff --git a/prover/proof_submitter/util.go b/prover/proof_submitter/util.go index 5c6e33009..e7d2f89a9 100644 --- a/prover/proof_submitter/util.go +++ b/prover/proof_submitter/util.go @@ -71,6 +71,7 @@ func sendTxWithBackoff( expectedReward uint64, meta *bindings.TaikoDataBlockMetadata, sendTxFunc func() (*types.Transaction, error), + retryInterval time.Duration, ) error { var ( isUnretryableError bool @@ -179,7 +180,7 @@ func sendTxWithBackoff( ) return nil - }, backoff.NewConstantBackOff(12*time.Second)); err != nil { + }, backoff.NewConstantBackOff(retryInterval)); err != nil { return fmt.Errorf("failed to send TaikoL1.proveBlock transaction: %w", err) } diff --git a/prover/proof_submitter/util_test.go b/prover/proof_submitter/util_test.go index a005e894f..4c1c20258 100644 --- a/prover/proof_submitter/util_test.go +++ b/prover/proof_submitter/util_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -38,9 +39,9 @@ func (s *ProofSubmitterTestSuite) TestSendTxWithBackoff() { 0, 0, meta, - func() (*types.Transaction, error) { - return nil, errors.New("L1_TEST") - })) + func() (*types.Transaction, error) { return nil, errors.New("L1_TEST") }, + 12*time.Second, + )) s.Nil(sendTxWithBackoff( context.Background(), @@ -64,5 +65,7 @@ func (s *ProofSubmitterTestSuite) TestSendTxWithBackoff() { } return block.Transactions()[0], nil - })) + }, + 12*time.Second, + )) } diff --git a/prover/proof_submitter/valid_proof_submitter.go b/prover/proof_submitter/valid_proof_submitter.go index 4d7376887..70f612550 100644 --- a/prover/proof_submitter/valid_proof_submitter.go +++ b/prover/proof_submitter/valid_proof_submitter.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -39,6 +40,7 @@ type ValidProofSubmitter struct { isSystemProver bool graffiti [32]byte expectedReward uint64 + retryInterval time.Duration } // NewValidProofSubmitter creates a new ValidProofSubmitter instance. @@ -53,6 +55,7 @@ func NewValidProofSubmitter( isSystemProver bool, graffiti string, expectedReward uint64, + retryInterval time.Duration, ) (*ValidProofSubmitter, error) { anchorValidator, err := anchorTxValidator.New(taikoL2Address, rpcClient.L2ChainID, rpcClient) if err != nil { @@ -89,6 +92,7 @@ func NewValidProofSubmitter( isSystemProver: isSystemProver, graffiti: rpc.StringToBytes32(graffiti), expectedReward: expectedReward, + retryInterval: retryInterval, }, nil } @@ -262,6 +266,7 @@ func (s *ValidProofSubmitter) SubmitProof( s.expectedReward, proofWithHeader.Meta, sendTx, + s.retryInterval, ); err != nil { if errors.Is(err, errUnretryable) { return nil diff --git a/prover/proof_submitter/valid_proof_submitter_test.go b/prover/proof_submitter/valid_proof_submitter_test.go index 858e2fc3c..06f5f83b9 100644 --- a/prover/proof_submitter/valid_proof_submitter_test.go +++ b/prover/proof_submitter/valid_proof_submitter_test.go @@ -49,6 +49,7 @@ func (s *ProofSubmitterTestSuite) SetupTest() { false, "test", 0, + 12*time.Second, ) s.Nil(err) diff --git a/prover/prover.go b/prover/prover.go index e142e721b..67d743ad9 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -26,11 +26,6 @@ import ( "github.com/urfave/cli/v2" ) -const ( - backOffMaxRetrys = 10 - backOffRetryInterval = 12 * time.Second -) - type cancelFunc func() // Prover keep trying to prove new proposed blocks valid/invalid. @@ -104,6 +99,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) { L2Endpoint: cfg.L2WsEndpoint, TaikoL1Address: cfg.TaikoL1Address, TaikoL2Address: cfg.TaikoL2Address, + RetryInterval: cfg.BackOffRetryInterval, }); err != nil { return err } @@ -210,6 +206,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) { p.cfg.SystemProver, p.cfg.Graffiti, p.cfg.ExpectedReward, + p.cfg.BackOffRetryInterval, ); err != nil { return err } @@ -266,7 +263,7 @@ func (p *Prover) eventLoop() { case <-verificationCheckTicker.C: if err := backoff.Retry( func() error { return p.checkChainVerification(lastLatestVerifiedL1Height) }, - backoff.NewExponentialBackOff(), + backoff.NewConstantBackOff(p.cfg.BackOffRetryInterval), ); err != nil { log.Error("Check chain verification error", "error", err) } @@ -468,7 +465,7 @@ func (p *Prover) onBlockProposed( go func() { if err := backoff.Retry( func() error { return handleBlockProposedEvent() }, - backoff.WithMaxRetries(backoff.NewConstantBackOff(backOffRetryInterval), backOffMaxRetrys), + backoff.WithMaxRetries(backoff.NewConstantBackOff(p.cfg.BackOffRetryInterval), p.cfg.BackOffMaxRetrys), ); err != nil { p.currentBlocksBeingProvenMutex.Lock() delete(p.currentBlocksBeingProven, event.Id.Uint64()) @@ -493,7 +490,7 @@ func (p *Prover) submitProofOp(ctx context.Context, proofWithHeader *proofProduc if err := backoff.Retry( func() error { return p.validProofSubmitter.SubmitProof(p.ctx, proofWithHeader) }, - backoff.WithMaxRetries(backoff.NewConstantBackOff(backOffRetryInterval), backOffMaxRetrys), + backoff.WithMaxRetries(backoff.NewConstantBackOff(p.cfg.BackOffRetryInterval), p.cfg.BackOffMaxRetrys), ); err != nil { log.Error("Submit proof error", "error", err) } diff --git a/testutils/suite.go b/testutils/suite.go index 46971eada..f4b86f99c 100644 --- a/testutils/suite.go +++ b/testutils/suite.go @@ -7,6 +7,7 @@ import ( "math/big" "os" + "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -60,6 +61,7 @@ func (s *ClientTestSuite) SetupTest() { TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")), L2EngineEndpoint: os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"), JwtSecret: string(jwtSecret), + RetryInterval: backoff.DefaultMaxInterval, }) s.Nil(err)