Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(pkg): improve/simplify reorg check logic #647

Merged
merged 17 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/flags/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Usage: "HTTP RPC endpoint of another synced L2 execution engine node",
Category: driverCategory,
}
// syncer specific flag
MaxExponent = &cli.Uint64Flag{
davidtaikocha marked this conversation as resolved.
Show resolved Hide resolved
Name: "syncer.maxExponent",
Usage: "Maximum exponent of retrieving L1 blocks when there is a mismatch between protocol and L2 EE," +
"0 means that it is reset to the genesis height",
Value: 0,
Category: driverCategory,
}
)

// DriverFlags All driver flags.
Expand All @@ -54,4 +62,5 @@ var DriverFlags = MergeFlags(CommonFlags, []cli.Flag{
P2PSyncVerifiedBlocks,
P2PSyncTimeout,
CheckPointSyncURL,
MaxExponent,
})
128 changes: 98 additions & 30 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"

ethereum "github.com/ethereum/go-ethereum"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/bindings/encoding"
anchorTxConstructor "github.com/taikoxyz/taiko-client/driver/anchor_tx_constructor"
Expand All @@ -40,6 +41,7 @@ type Syncer struct {
// Used by BlockInserter
lastInsertedBlockID *big.Int
reorgDetectedFlag bool
maxRetrieveExponent uint64
}

// NewSyncer creates a new syncer instance.
Expand All @@ -48,6 +50,7 @@ func NewSyncer(
client *rpc.Client,
state *state.State,
progressTracker *beaconsync.SyncProgressTracker,
maxRetrieveExponent uint64,
) (*Syncer, error) {
configs, err := client.TaikoL1.GetConfig(&bind.CallOpts{Context: ctx})
if err != nil {
Expand All @@ -70,6 +73,7 @@ func NewSyncer(
rpc.BlockMaxTxListBytes,
client.L2.ChainID,
),
maxRetrieveExponent: maxRetrieveExponent,
}, nil
}

Expand Down Expand Up @@ -522,65 +526,129 @@ func (s *Syncer) createExecutionPayloads(

// checkLastVerifiedBlockMismatch checks if there is a mismatch between protocol's last verified block hash and
// the corresponding L2 EE block hash.
func (s *Syncer) checkLastVerifiedBlockMismatch(ctx context.Context) (bool, error) {
func (s *Syncer) checkLastVerifiedBlockMismatch(ctx context.Context) (*rpc.ReorgCheckResult, error) {
var (
reorgCheckResult = new(rpc.ReorgCheckResult)
err error
)

stateVars, err := s.rpc.GetProtocolStateVariables(&bind.CallOpts{Context: ctx})
if err != nil {
return false, err
return nil, err
}

if s.state.GetL2Head().Number.Uint64() < stateVars.B.LastVerifiedBlockId {
return false, nil
return reorgCheckResult, nil
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
genesisL1Header, err := s.rpc.GetGenesisL1Header(ctx)
if err != nil {
return false, err
return nil, fmt.Errorf("failed to fetch genesis L1 header: %w", err)
}

l2Header, err := s.rpc.L2.HeaderByNumber(ctx, new(big.Int).SetUint64(stateVars.B.LastVerifiedBlockId))
reorgCheckResult, err = s.retrievePastBlock(ctx, stateVars.B.LastVerifiedBlockId, 0, genesisL1Header)
if err != nil {
return false, err
return nil, err
}

return blockInfo.Ts.BlockHash != l2Header.Hash(), nil
return reorgCheckResult, nil
}

// checkReorg checks whether the L1 chain has been reorged, and resets the L1Current cursor if necessary.
func (s *Syncer) checkReorg(
// retrievePastBlock find proper L1 header and L2 block id to reset when there is a mismatch
func (s *Syncer) retrievePastBlock(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
) (*rpc.ReorgCheckResult, error) {
blockID uint64,
retries uint64,
genesisL1Header *types.Header) (*rpc.ReorgCheckResult, error) {
if retries > s.maxRetrieveExponent {
return &rpc.ReorgCheckResult{
IsReorged: true,
L1CurrentToReset: genesisL1Header,
LastHandledBlockIDToReset: new(big.Int).SetUint64(blockID),
}, nil
}

var (
reorgCheckResult = new(rpc.ReorgCheckResult)
err error
currentBlockID uint64
l1HeaderToSet = genesisL1Header
)

if val := uint64(1 << retries); blockID > val {
currentBlockID = blockID - val + 1
} else {
currentBlockID = 0
}

blockInfo, err := s.rpc.GetL2BlockInfo(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
return nil, err
}

l2Header, err := s.rpc.L2.HeaderByNumber(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
return nil, err
}
if blockInfo.Ts.BlockHash == l2Header.Hash() {
// To reduce the number of call contracts by bringing forward the termination condition judgement
if retries == 0 {
return nil, nil
}
l1Origin, err := s.rpc.L2.L1OriginByID(ctx, new(big.Int).SetUint64(currentBlockID))
if err != nil {
if err.Error() == ethereum.NotFound.Error() {
log.Info("L1Origin not found in retrievePastBlock because the L2 EE is just synced through P2P",
"blockID",
currentBlockID)
// Can't find l1Origin in L2 EE, so we call the contract to get block info
blockInfo, err := s.rpc.TaikoL1.GetBlock(&bind.CallOpts{Context: ctx}, currentBlockID)
if err != nil {
return nil, err
}
if blockInfo.Blk.ProposedIn != 0 {
l1HeaderToSet, err = s.rpc.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(blockInfo.Blk.ProposedIn))
if err != nil {
return nil, err
}
}
} else {
return nil, err
}
} else {
l1HeaderToSet, err = s.rpc.L1.HeaderByNumber(ctx, l1Origin.L1BlockHeight)
if err != nil {
return nil, err
}
}
reorgCheckResult.IsReorged = retries > 0
reorgCheckResult.L1CurrentToReset = l1HeaderToSet
reorgCheckResult.LastHandledBlockIDToReset = new(big.Int).SetUint64(currentBlockID)
} else {
reorgCheckResult, err = s.retrievePastBlock(ctx, blockID, retries+1, genesisL1Header)
if err != nil {
return nil, err
}
}
return reorgCheckResult, nil
}

// checkReorg checks whether the L1 chain has been reorged, and resets the L1Current cursor if necessary.
func (s *Syncer) checkReorg(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
) (*rpc.ReorgCheckResult, error) {
// If the L2 chain is at genesis, we don't need to check L1 reorg.
if s.state.GetL1Current().Number == s.state.GenesisL1Height {
return reorgCheckResult, nil
return new(rpc.ReorgCheckResult), nil
}

// 1. The latest verified block
mismatch, err := s.checkLastVerifiedBlockMismatch(ctx)
reorgCheckResult, err := s.checkLastVerifiedBlockMismatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to check if last verified block in L2 EE has been reorged: %w", err)
}

// If the latest verified block in chain is mismatched, we reset the L2 chain to genesis, and restart
// the calldata sync process.
// TODO(Gavin): improve this approach.
if mismatch {
log.Warn("The latest verified block mismatch detected, reset L2 chain to genesis")

genesisL1Header, err := s.rpc.GetGenesisL1Header(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch genesis L1 header: %w", err)
}

reorgCheckResult.IsReorged = true
reorgCheckResult.L1CurrentToReset = genesisL1Header
reorgCheckResult.LastHandledBlockIDToReset = common.Big0
} else {
if reorgCheckResult == nil {
// 2. Parent block
reorgCheckResult, err = s.rpc.CheckL1Reorg(
ctx,
Expand Down
41 changes: 39 additions & 2 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type CalldataSyncerTestSuite struct {
func (s *CalldataSyncerTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()

state, err := state.New(context.Background(), s.RPCClient)
state2, err := state.New(context.Background(), s.RPCClient)
s.Nil(err)

syncer, err := NewSyncer(
context.Background(),
s.RPCClient,
state,
state2,
beaconsync.NewSyncProgressTracker(s.RPCClient.L2, 1*time.Hour),
0,
)
s.Nil(err)
s.s = syncer
Expand Down Expand Up @@ -78,6 +79,7 @@ func (s *CalldataSyncerTestSuite) TestCancelNewSyncer() {
s.RPCClient,
s.s.state,
s.s.progressTracker,
0,
)
s.Nil(syncer)
s.NotNil(err)
Expand Down Expand Up @@ -211,6 +213,41 @@ func (s *CalldataSyncerTestSuite) TestTreasuryIncome() {
s.Zero(balanceAfter.Cmp(balance))
}

func (s *CalldataSyncerTestSuite) TestRetrievePastBlock() {
syncer, err := NewSyncer(
context.Background(),
s.RPCClient,
s.s.state,
s.s.progressTracker,
5,
)
s.Nil(err)
sender := s.p.GetSender()

s.s = syncer
for i := 0; i < 10; i++ {
s.ProposeAndInsertValidBlock(s.p, s.s)
}
genesisL1Header, err := s.RPCClient.GetGenesisL1Header(context.Background())
s.Nil(err)
l1Snapshot := s.SetL1Snapshot()
for i := 0; i < 5; i++ {
s.ProposeAndInsertValidBlock(s.p, s.s)
}
s.RevertL1Snapshot(l1Snapshot)
// Because of evm_revert operation, the nonce of the proposer need to be adjusted.
s.Nil(sender.SetNonce(nil, true))
// Propose 5 blocks on another fork
for i := 0; i < 5; i++ {
s.ProposeInvalidTxListBytes(s.p)
}
reorgResult, err := s.s.retrievePastBlock(context.Background(), 12, 0, genesisL1Header)
s.Nil(err)
s.NotNil(reorgResult)
s.Equal(reorgResult.IsReorged, true)
s.GreaterOrEqual(reorgResult.L1CurrentToReset.Number.Uint64(), genesisL1Header.Number.Uint64())
}

func TestCalldataSyncerTestSuite(t *testing.T) {
suite.Run(t, new(CalldataSyncerTestSuite))
}
3 changes: 2 additions & 1 deletion driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ func New(
state *state.State,
p2pSyncVerifiedBlocks bool,
p2pSyncTimeout time.Duration,
maxRetrieveExponent uint64,
) (*L2ChainSyncer, error) {
tracker := beaconsync.NewSyncProgressTracker(rpc.L2, p2pSyncTimeout)
go tracker.Track(ctx)

beaconSyncer := beaconsync.NewSyncer(ctx, rpc, state, tracker)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker)
calldataSyncer, err := calldata.NewSyncer(ctx, rpc, state, tracker, maxRetrieveExponent)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions driver/chain_syncer/chain_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (s *ChainSyncerTestSuite) SetupTest() {
state,
false,
1*time.Hour,
0,
)
s.Nil(err)
s.s = syncer
Expand Down
2 changes: 2 additions & 0 deletions driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
P2PSyncTimeout time.Duration
RPCTimeout time.Duration
RetryInterval time.Duration
MaxExponent uint64
}

// NewConfigFromCliContext creates a new config instance from
Expand Down Expand Up @@ -60,5 +61,6 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
P2PSyncVerifiedBlocks: p2pSyncVerifiedBlocks,
P2PSyncTimeout: c.Duration(flags.P2PSyncTimeout.Name),
RPCTimeout: timeout,
MaxExponent: c.Uint64(flags.MaxExponent.Name),
}, nil
}
1 change: 1 addition & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (d *Driver) InitFromConfig(ctx context.Context, cfg *Config) (err error) {
d.state,
cfg.P2PSyncVerifiedBlocks,
cfg.P2PSyncTimeout,
cfg.MaxExponent,
); err != nil {
return err
}
Expand Down
56 changes: 56 additions & 0 deletions internal/testutils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,62 @@ func (s *ClientTestSuite) ProposeAndInsertValidBlock(
return event
}

func (s *ClientTestSuite) ProposeValidBlock(
proposer Proposer,
) *bindings.TaikoL1ClientBlockProposed {
l1Head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)

l2Head, err := s.RPCClient.L2.HeaderByNumber(context.Background(), nil)
s.Nil(err)

// Propose txs in L2 execution engine's mempool
sink := make(chan *bindings.TaikoL1ClientBlockProposed)

sub, err := s.RPCClient.TaikoL1.WatchBlockProposed(nil, sink, nil, nil)
s.Nil(err)
defer func() {
sub.Unsubscribe()
close(sink)
}()

baseFeeInfo, err := s.RPCClient.TaikoL2.GetBasefee(nil, l1Head.Number.Uint64()+1, uint32(l2Head.GasUsed))
s.Nil(err)

nonce, err := s.RPCClient.L2.PendingNonceAt(context.Background(), s.TestAddr)
s.Nil(err)

tx := types.NewTransaction(
nonce,
common.BytesToAddress(RandomBytes(32)),
common.Big1,
100000,
baseFeeInfo.Basefee,
[]byte{},
)
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(s.RPCClient.L2.ChainID), s.TestAddrPrivKey)
s.Nil(err)
s.Nil(s.RPCClient.L2.SendTransaction(context.Background(), signedTx))

s.Nil(proposer.ProposeOp(context.Background()))

event := <-sink

_, isPending, err := s.RPCClient.L1.TransactionByHash(context.Background(), event.Raw.TxHash)
s.Nil(err)
s.False(isPending)

receipt, err := s.RPCClient.L1.TransactionReceipt(context.Background(), event.Raw.TxHash)
s.Nil(err)
s.Equal(types.ReceiptStatusSuccessful, receipt.Status)

newL1Head, err := s.RPCClient.L1.HeaderByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(newL1Head.Number.Uint64(), l1Head.Number.Uint64())

return event
}

// NewTestProverServer starts a new prover server that has channel listeners to respond and react
// to requests for capacity, which provers can call.
func (s *ClientTestSuite) NewTestProverServer(
Expand Down
Loading
Loading