Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(flags): add retry related flags #281

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/flags/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ var (
Category: metricsCategory,
Value: 6060,
}
BackOffMaxRetrys = &cli.Uint64Flag{
Name: "backoff.maxRetrys",
Usage: "Max retry times when there is an error",
Category: commonCategory,
Value: 10,
}
BackOffRetryInterval = &cli.Uint64Flag{
Name: "backoff.retryInterval",
Usage: "Retry interval in seconds when there is an error",
Category: commonCategory,
Value: 12,
}
)

// All common flags.
Expand All @@ -97,6 +109,8 @@ var CommonFlags = []cli.Flag{
MetricsEnabled,
MetricsAddr,
MetricsPort,
BackOffMaxRetrys,
BackOffRetryInterval,
}

// MergeFlags merges the given flag slices.
Expand Down
2 changes: 2 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
JwtSecret string
P2PSyncVerifiedBlocks bool
P2PSyncTimeout time.Duration
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext creates a new config instance from
Expand Down Expand Up @@ -51,5 +52,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
JwtSecret: string(jwtSecret),
P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks,
P2PSyncTimeout: time.Duration(int64(time.Second) * int64(c.Uint(flags.P2PSyncTimeout.Name))),
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
17 changes: 9 additions & 8 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
)

const (
// Time to wait before the next try, when receiving subscription errors.
RetryDelay = 10 * time.Second
protocolStatusReportInterval = 30 * time.Second
)

// Driver keeps the L2 execution engine's local block chain in sync with the TaikoL1
Expand All @@ -31,8 +30,9 @@ type Driver struct {
l1HeadSub event.Subscription
syncNotify chan struct{}

ctx context.Context
wg sync.WaitGroup
backOffRetryInterval time.Duration
ctx context.Context
wg sync.WaitGroup
}

// New initializes the given driver instance based on the command line flags.
Expand All @@ -51,6 +51,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) {
d.wg = sync.WaitGroup{}
d.syncNotify = make(chan struct{}, 1)
d.ctx = ctx
d.backOffRetryInterval = cfg.BackOffRetryInterval

if d.rpc, err = rpc.NewClient(d.ctx, &rpc.ClientConfig{
L1Endpoint: cfg.L1Endpoint,
Expand All @@ -60,6 +61,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) {
TaikoL2Address: cfg.TaikoL2Address,
L2EngineEndpoint: cfg.L2EngineEndpoint,
JwtSecret: cfg.JwtSecret,
RetryInterval: cfg.BackOffRetryInterval,
}); err != nil {
return err
}
Expand Down Expand Up @@ -116,7 +118,6 @@ func (d *Driver) Close() {
// eventLoop starts the main loop of a L2 execution engine's driver.
func (d *Driver) eventLoop() {
defer d.wg.Done()
constatnBackoff := backoff.NewConstantBackOff(12 * time.Second)

// reqSync requests performing a synchronising operation, won't block
// if we are already synchronising.
Expand All @@ -129,7 +130,7 @@ func (d *Driver) eventLoop() {

// doSyncWithBackoff performs a synchronising operation with a backoff strategy.
doSyncWithBackoff := func() {
if err := backoff.Retry(d.doSync, constatnBackoff); err != nil {
if err := backoff.Retry(d.doSync, backoff.NewConstantBackOff(d.backOffRetryInterval)); err != nil {
log.Error("Sync L2 execution engine's block chain error", "error", err)
}
}
Expand Down Expand Up @@ -176,7 +177,7 @@ func (d *Driver) ChainSyncer() *chainSyncer.L2ChainSyncer {

// reportProtocolStatus reports some protocol status intervally.
func (d *Driver) reportProtocolStatus() {
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(protocolStatusReportInterval)
defer func() {
ticker.Stop()
d.wg.Done()
Expand All @@ -193,7 +194,7 @@ func (d *Driver) reportProtocolStatus() {
maxNumBlocks = configs.MaxNumProposedBlocks.Uint64()
return nil
},
backoff.NewConstantBackOff(RetryDelay),
backoff.NewConstantBackOff(d.backOffRetryInterval),
); err != nil {
log.Error("Failed to get protocol state variables", "error", err)
return
Expand Down
11 changes: 10 additions & 1 deletion pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

const (
DefaultBlocksReadPerEpoch = 1000
DefaultRetryInterval = 12 * time.Second
)

var (
Expand Down Expand Up @@ -52,6 +53,7 @@ type BlockBatchIterator struct {
isEnd bool
reverse bool
reorgRewindDepth uint64
retryInterval time.Duration
}

// BlockBatchIteratorConfig represents the configs of a block batch iterator.
Expand All @@ -63,6 +65,7 @@ type BlockBatchIteratorConfig struct {
OnBlocks OnBlocksFunc
Reverse bool
ReorgRewindDepth *uint64
RetryInterval *time.Duration
}

// NewBlockBatchIterator creates a new block batch iterator instance.
Expand Down Expand Up @@ -123,6 +126,12 @@ func NewBlockBatchIterator(ctx context.Context, cfg *BlockBatchIteratorConfig) (
iterator.blocksReadPerEpoch = DefaultBlocksReadPerEpoch
}

if cfg.RetryInterval == nil {
iterator.retryInterval = DefaultRetryInterval
} else {
iterator.retryInterval = *cfg.RetryInterval
}

if cfg.EndHeight != nil {
endHeightUint64 := cfg.EndHeight.Uint64()
iterator.endHeight = &endHeightUint64
Expand Down Expand Up @@ -164,7 +173,7 @@ func (i *BlockBatchIterator) Iter() error {
return nil
}

if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(12*time.Second)); err != nil {
if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(i.retryInterval)); err != nil {
return err
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -44,11 +45,12 @@ type ClientConfig struct {
TaikoL2Address common.Address
L2EngineEndpoint string
JwtSecret string
RetryInterval time.Duration
}

// NewClient initializes all RPC clients used by Taiko client softwares.
func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint)
l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +60,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
return nil, err
}

l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint)
l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -92,14 +94,19 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
// won't be initialized.
var l2AuthRPC *EngineClient
if len(cfg.L2EngineEndpoint) != 0 && len(cfg.JwtSecret) != 0 {
if l2AuthRPC, err = DialEngineClientWithBackoff(ctx, cfg.L2EngineEndpoint, cfg.JwtSecret); err != nil {
if l2AuthRPC, err = DialEngineClientWithBackoff(
ctx,
cfg.L2EngineEndpoint,
cfg.JwtSecret,
cfg.RetryInterval,
); err != nil {
return nil, err
}
}

var l2CheckPoint *ethclient.Client
if len(cfg.L2CheckPoint) != 0 {
if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint); err != nil {
if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval); err != nil {
return nil, err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"testing"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
Expand All @@ -17,6 +18,7 @@ func newTestClient(t *testing.T) *Client {
TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")),
L2EngineEndpoint: os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"),
JwtSecret: os.Getenv("JWT_SECRET"),
RetryInterval: backoff.DefaultMaxInterval,
})

require.Nil(t, err)
Expand Down
13 changes: 9 additions & 4 deletions pkg/rpc/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// DialClientWithBackoff connects a ethereum RPC client at the given URL with
// a backoff strategy.
func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client, error) {
func DialClientWithBackoff(ctx context.Context, url string, retryInterval time.Duration) (*ethclient.Client, error) {
var client *ethclient.Client
if err := backoff.Retry(
func() (err error) {
Expand All @@ -26,7 +26,7 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,
}
return err
},
backoff.NewConstantBackOff(12*time.Second),
backoff.NewConstantBackOff(retryInterval),
); err != nil {
return nil, err
}
Expand All @@ -36,7 +36,12 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,

// DialEngineClientWithBackoff connects an ethereum engine RPC client at the
// given URL with a backoff strategy.
func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret string) (*EngineClient, error) {
func DialEngineClientWithBackoff(
ctx context.Context,
url string,
jwtSecret string,
retryInterval time.Duration,
) (*EngineClient, error) {
var engineClient *EngineClient
if err := backoff.Retry(
func() (err error) {
Expand All @@ -49,7 +54,7 @@ func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret stri
engineClient = &EngineClient{client}
return nil
},
backoff.NewConstantBackOff(12*time.Second),
backoff.NewConstantBackOff(retryInterval),
); err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/rpc/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -21,6 +22,7 @@ func TestDialEngineClientWithBackoff(t *testing.T) {
context.Background(),
os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"),
string(jwtSecret),
12*time.Second,
)

require.Nil(t, err)
Expand All @@ -32,7 +34,11 @@ func TestDialEngineClientWithBackoff(t *testing.T) {
}

func TestDialClientWithBackoff(t *testing.T) {
client, err := DialClientWithBackoff(context.Background(), os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"))
client, err := DialClientWithBackoff(
context.Background(),
os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"),
12*time.Second,
)
require.Nil(t, err)

genesis, err := client.HeaderByNumber(context.Background(), common.Big0)
Expand Down
2 changes: 2 additions & 0 deletions proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext initializes a Config instance from
Expand Down Expand Up @@ -93,5 +94,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
1 change: 1 addition & 0 deletions proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) {
L2Endpoint: cfg.L2Endpoint,
TaikoL1Address: cfg.TaikoL1Address,
TaikoL2Address: cfg.TaikoL2Address,
RetryInterval: cfg.BackOffRetryInterval,
}); err != nil {
return fmt.Errorf("initialize rpc clients error: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions prover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
RandomDummyProofDelayLowerBound *time.Duration
RandomDummyProofDelayUpperBound *time.Duration
ExpectedReward uint64
BackOffMaxRetrys uint64
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext creates a new config instance from command line flags.
Expand Down Expand Up @@ -136,5 +138,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
RandomDummyProofDelayLowerBound: randomDummyProofDelayLowerBound,
RandomDummyProofDelayUpperBound: randomDummyProofDelayUpperBound,
ExpectedReward: c.Uint64(flags.ExpectedReward.Name),
BackOffMaxRetrys: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
5 changes: 3 additions & 2 deletions prover/proof_producer/zkevm_rpcd_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

var (
errProofGenerating = errors.New("proof is generating")
errProofGenerating = errors.New("proof is generating")
proofPollingInterval = 10 * time.Second
)

// ZkevmRpcdProducer is responsible for requesting zk proofs from the given proverd endpoint.
Expand Down Expand Up @@ -175,7 +176,7 @@ func (p *ZkevmRpcdProducer) callProverDaemon(ctx context.Context, opts *ProofReq
degree = output.Circuit.Degree
log.Info("Proof generated", "height", opts.Height, "degree", degree, "time", time.Since(start))
return nil
}, backoff.NewConstantBackOff(10*time.Second)); err != nil {
}, backoff.NewConstantBackOff(proofPollingInterval)); err != nil {
return nil, 0, err
}
return proof, degree, nil
Expand Down
3 changes: 2 additions & 1 deletion prover/proof_submitter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func sendTxWithBackoff(
expectedReward uint64,
meta *bindings.TaikoDataBlockMetadata,
sendTxFunc func() (*types.Transaction, error),
retryInterval time.Duration,
) error {
var (
isUnretryableError bool
Expand Down Expand Up @@ -179,7 +180,7 @@ func sendTxWithBackoff(
)

return nil
}, backoff.NewConstantBackOff(12*time.Second)); err != nil {
}, backoff.NewConstantBackOff(retryInterval)); err != nil {
return fmt.Errorf("failed to send TaikoL1.proveBlock transaction: %w", err)
}

Expand Down
Loading