Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(flags): add retry related flags
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha committed Jun 13, 2023
1 parent 344bac1 commit 8542c0a
Show file tree
Hide file tree
Showing 18 changed files with 96 additions and 33 deletions.
14 changes: 14 additions & 0 deletions cmd/flags/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ var (
Category: metricsCategory,
Value: 6060,
}
BackOffMaxRetrys = &cli.Uint64Flag{
Name: "backoff.maxRetrys",
Usage: "Max retry times when there is an error",
Category: commonCategory,
Value: 10,
}
BackOffRetryInterval = &cli.Uint64Flag{
Name: "backoff.retryInterval",
Usage: "Retry interval in seconds when there is an error",
Category: commonCategory,
Value: 12,
}
)

// All common flags.
Expand All @@ -97,6 +109,8 @@ var CommonFlags = []cli.Flag{
MetricsEnabled,
MetricsAddr,
MetricsPort,
BackOffMaxRetrys,
BackOffRetryInterval,
}

// MergeFlags merges the given flag slices.
Expand Down
2 changes: 2 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
JwtSecret string
P2PSyncVerifiedBlocks bool
P2PSyncTimeout time.Duration
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext creates a new config instance from
Expand Down Expand Up @@ -51,5 +52,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
JwtSecret: string(jwtSecret),
P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks,
P2PSyncTimeout: time.Duration(int64(time.Second) * int64(c.Uint(flags.P2PSyncTimeout.Name))),
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
17 changes: 9 additions & 8 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
)

const (
// Time to wait before the next try, when receiving subscription errors.
RetryDelay = 10 * time.Second
protocolStatusReportInterval = 30 * time.Second
)

// Driver keeps the L2 execution engine's local block chain in sync with the TaikoL1
Expand All @@ -31,8 +30,9 @@ type Driver struct {
l1HeadSub event.Subscription
syncNotify chan struct{}

ctx context.Context
wg sync.WaitGroup
backOffRetryInterval time.Duration
ctx context.Context
wg sync.WaitGroup
}

// New initializes the given driver instance based on the command line flags.
Expand All @@ -51,6 +51,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) {
d.wg = sync.WaitGroup{}
d.syncNotify = make(chan struct{}, 1)
d.ctx = ctx
d.backOffRetryInterval = cfg.BackOffRetryInterval

if d.rpc, err = rpc.NewClient(d.ctx, &rpc.ClientConfig{
L1Endpoint: cfg.L1Endpoint,
Expand All @@ -60,6 +61,7 @@ func InitFromConfig(ctx context.Context, d *Driver, cfg *Config) (err error) {
TaikoL2Address: cfg.TaikoL2Address,
L2EngineEndpoint: cfg.L2EngineEndpoint,
JwtSecret: cfg.JwtSecret,
RetryInterval: cfg.BackOffRetryInterval,
}); err != nil {
return err
}
Expand Down Expand Up @@ -116,7 +118,6 @@ func (d *Driver) Close() {
// eventLoop starts the main loop of a L2 execution engine's driver.
func (d *Driver) eventLoop() {
defer d.wg.Done()
constatnBackoff := backoff.NewConstantBackOff(12 * time.Second)

// reqSync requests performing a synchronising operation, won't block
// if we are already synchronising.
Expand All @@ -129,7 +130,7 @@ func (d *Driver) eventLoop() {

// doSyncWithBackoff performs a synchronising operation with a backoff strategy.
doSyncWithBackoff := func() {
if err := backoff.Retry(d.doSync, constatnBackoff); err != nil {
if err := backoff.Retry(d.doSync, backoff.NewConstantBackOff(d.backOffRetryInterval)); err != nil {
log.Error("Sync L2 execution engine's block chain error", "error", err)
}
}
Expand Down Expand Up @@ -176,7 +177,7 @@ func (d *Driver) ChainSyncer() *chainSyncer.L2ChainSyncer {

// reportProtocolStatus reports some protocol status intervally.
func (d *Driver) reportProtocolStatus() {
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(protocolStatusReportInterval)
defer func() {
ticker.Stop()
d.wg.Done()
Expand All @@ -193,7 +194,7 @@ func (d *Driver) reportProtocolStatus() {
maxNumBlocks = configs.MaxNumProposedBlocks.Uint64()
return nil
},
backoff.NewConstantBackOff(RetryDelay),
backoff.NewConstantBackOff(d.backOffRetryInterval),
); err != nil {
log.Error("Failed to get protocol state variables", "error", err)
return
Expand Down
11 changes: 10 additions & 1 deletion pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

const (
DefaultBlocksReadPerEpoch = 1000
DefaultRetryInterval = 12 * time.Second
)

var (
Expand Down Expand Up @@ -52,6 +53,7 @@ type BlockBatchIterator struct {
isEnd bool
reverse bool
reorgRewindDepth uint64
retryInterval time.Duration
}

// BlockBatchIteratorConfig represents the configs of a block batch iterator.
Expand All @@ -63,6 +65,7 @@ type BlockBatchIteratorConfig struct {
OnBlocks OnBlocksFunc
Reverse bool
ReorgRewindDepth *uint64
RetryInterval *time.Duration
}

// NewBlockBatchIterator creates a new block batch iterator instance.
Expand Down Expand Up @@ -123,6 +126,12 @@ func NewBlockBatchIterator(ctx context.Context, cfg *BlockBatchIteratorConfig) (
iterator.blocksReadPerEpoch = DefaultBlocksReadPerEpoch
}

if cfg.RetryInterval == nil {
iterator.retryInterval = DefaultRetryInterval
} else {
iterator.retryInterval = *cfg.RetryInterval
}

if cfg.EndHeight != nil {
endHeightUint64 := cfg.EndHeight.Uint64()
iterator.endHeight = &endHeightUint64
Expand Down Expand Up @@ -164,7 +173,7 @@ func (i *BlockBatchIterator) Iter() error {
return nil
}

if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(12*time.Second)); err != nil {
if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(i.retryInterval)); err != nil {
return err
}

Expand Down
15 changes: 11 additions & 4 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -44,11 +45,12 @@ type ClientConfig struct {
TaikoL2Address common.Address
L2EngineEndpoint string
JwtSecret string
RetryInterval time.Duration
}

// NewClient initializes all RPC clients used by Taiko client softwares.
func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint)
l1RPC, err := DialClientWithBackoff(ctx, cfg.L1Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +60,7 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
return nil, err
}

l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint)
l2RPC, err := DialClientWithBackoff(ctx, cfg.L2Endpoint, cfg.RetryInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -92,14 +94,19 @@ func NewClient(ctx context.Context, cfg *ClientConfig) (*Client, error) {
// won't be initialized.
var l2AuthRPC *EngineClient
if len(cfg.L2EngineEndpoint) != 0 && len(cfg.JwtSecret) != 0 {
if l2AuthRPC, err = DialEngineClientWithBackoff(ctx, cfg.L2EngineEndpoint, cfg.JwtSecret); err != nil {
if l2AuthRPC, err = DialEngineClientWithBackoff(
ctx,
cfg.L2EngineEndpoint,
cfg.JwtSecret,
cfg.RetryInterval,
); err != nil {
return nil, err
}
}

var l2CheckPoint *ethclient.Client
if len(cfg.L2CheckPoint) != 0 {
if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint); err != nil {
if l2CheckPoint, err = DialClientWithBackoff(ctx, cfg.L2CheckPoint, cfg.RetryInterval); err != nil {
return nil, err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"testing"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
Expand All @@ -17,6 +18,7 @@ func newTestClient(t *testing.T) *Client {
TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")),
L2EngineEndpoint: os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"),
JwtSecret: os.Getenv("JWT_SECRET"),
RetryInterval: backoff.DefaultMaxInterval,
})

require.Nil(t, err)
Expand Down
13 changes: 9 additions & 4 deletions pkg/rpc/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// DialClientWithBackoff connects a ethereum RPC client at the given URL with
// a backoff strategy.
func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client, error) {
func DialClientWithBackoff(ctx context.Context, url string, retryInterval time.Duration) (*ethclient.Client, error) {
var client *ethclient.Client
if err := backoff.Retry(
func() (err error) {
Expand All @@ -26,7 +26,7 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,
}
return err
},
backoff.NewConstantBackOff(12*time.Second),
backoff.NewConstantBackOff(retryInterval),
); err != nil {
return nil, err
}
Expand All @@ -36,7 +36,12 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,

// DialEngineClientWithBackoff connects an ethereum engine RPC client at the
// given URL with a backoff strategy.
func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret string) (*EngineClient, error) {
func DialEngineClientWithBackoff(
ctx context.Context,
url string,
jwtSecret string,
retryInterval time.Duration,
) (*EngineClient, error) {
var engineClient *EngineClient
if err := backoff.Retry(
func() (err error) {
Expand All @@ -49,7 +54,7 @@ func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret stri
engineClient = &EngineClient{client}
return nil
},
backoff.NewConstantBackOff(12*time.Second),
backoff.NewConstantBackOff(retryInterval),
); err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/rpc/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -21,6 +22,7 @@ func TestDialEngineClientWithBackoff(t *testing.T) {
context.Background(),
os.Getenv("L2_EXECUTION_ENGINE_AUTH_ENDPOINT"),
string(jwtSecret),
12*time.Second,
)

require.Nil(t, err)
Expand All @@ -32,7 +34,11 @@ func TestDialEngineClientWithBackoff(t *testing.T) {
}

func TestDialClientWithBackoff(t *testing.T) {
client, err := DialClientWithBackoff(context.Background(), os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"))
client, err := DialClientWithBackoff(
context.Background(),
os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"),
12*time.Second,
)
require.Nil(t, err)

genesis, err := client.HeaderByNumber(context.Background(), common.Big0)
Expand Down
2 changes: 2 additions & 0 deletions proposer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
MinBlockGasLimit uint64
MaxProposedTxListsPerEpoch uint64
ProposeBlockTxGasLimit *uint64
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext initializes a Config instance from
Expand Down Expand Up @@ -93,5 +94,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
MinBlockGasLimit: c.Uint64(flags.MinBlockGasLimit.Name),
MaxProposedTxListsPerEpoch: c.Uint64(flags.MaxProposedTxListsPerEpoch.Name),
ProposeBlockTxGasLimit: proposeBlockTxGasLimit,
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
1 change: 1 addition & 0 deletions proposer/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func InitFromConfig(ctx context.Context, p *Proposer, cfg *Config) (err error) {
L2Endpoint: cfg.L2Endpoint,
TaikoL1Address: cfg.TaikoL1Address,
TaikoL2Address: cfg.TaikoL2Address,
RetryInterval: cfg.BackOffRetryInterval,
}); err != nil {
return fmt.Errorf("initialize rpc clients error: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions prover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
RandomDummyProofDelayLowerBound *time.Duration
RandomDummyProofDelayUpperBound *time.Duration
ExpectedReward uint64
BackOffMaxRetrys uint64
BackOffRetryInterval time.Duration
}

// NewConfigFromCliContext creates a new config instance from command line flags.
Expand Down Expand Up @@ -136,5 +138,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
RandomDummyProofDelayLowerBound: randomDummyProofDelayLowerBound,
RandomDummyProofDelayUpperBound: randomDummyProofDelayUpperBound,
ExpectedReward: c.Uint64(flags.ExpectedReward.Name),
BackOffMaxRetrys: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: time.Duration(c.Uint64(flags.BackOffRetryInterval.Name)) * time.Second,
}, nil
}
5 changes: 3 additions & 2 deletions prover/proof_producer/zkevm_rpcd_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

var (
errProofGenerating = errors.New("proof is generating")
errProofGenerating = errors.New("proof is generating")
proofPollingInterval = 10 * time.Second
)

// ZkevmRpcdProducer is responsible for requesting zk proofs from the given proverd endpoint.
Expand Down Expand Up @@ -175,7 +176,7 @@ func (p *ZkevmRpcdProducer) callProverDaemon(ctx context.Context, opts *ProofReq
degree = output.Circuit.Degree
log.Info("Proof generated", "height", opts.Height, "degree", degree, "time", time.Since(start))
return nil
}, backoff.NewConstantBackOff(10*time.Second)); err != nil {
}, backoff.NewConstantBackOff(proofPollingInterval)); err != nil {
return nil, 0, err
}
return proof, degree, nil
Expand Down
3 changes: 2 additions & 1 deletion prover/proof_submitter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func sendTxWithBackoff(
expectedReward uint64,
meta *bindings.TaikoDataBlockMetadata,
sendTxFunc func() (*types.Transaction, error),
retryInterval time.Duration,
) error {
var (
isUnretryableError bool
Expand Down Expand Up @@ -179,7 +180,7 @@ func sendTxWithBackoff(
)

return nil
}, backoff.NewConstantBackOff(12*time.Second)); err != nil {
}, backoff.NewConstantBackOff(retryInterval)); err != nil {
return fmt.Errorf("failed to send TaikoL1.proveBlock transaction: %w", err)
}

Expand Down
Loading

0 comments on commit 8542c0a

Please sign in to comment.