Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(all): check L1 reorg before each operation (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtaikocha authored May 30, 2023
1 parent 7cc5d54 commit e76b03f
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 305 deletions.
141 changes: 32 additions & 109 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -104,8 +103,39 @@ func (s *Syncer) onBlockProposed(
event *bindings.TaikoL1ClientBlockProposed,
endIter eventIterator.EndBlockProposedEventIterFunc,
) error {
if event.Id.Cmp(common.Big0) == 0 {
return nil
}

if !s.progressTracker.Triggered() {
// Check whteher we need to reorg the L2 chain at first.
reorged, l1CurrentToReset, lastInsertedBlockIDToReset, err := s.rpc.CheckL1Reorg(
ctx,
new(big.Int).Sub(event.Id, common.Big1),
)
if err != nil {
return fmt.Errorf("failed to check whether L1 chain has been reorged: %w", err)
}

if reorged {
log.Info(
"Reset L1Current cursor due to L1 reorg",
"l1CurrentHeightOld", s.state.GetL1Current().Number,
"l1CurrentHashOld", s.state.GetL1Current().Hash(),
"l1CurrentHeightNew", l1CurrentToReset.Number,
"l1CurrentHashNew", l1CurrentToReset.Hash(),
"lastInsertedBlockIDOld", s.lastInsertedBlockID,
"lastInsertedBlockIDNew", lastInsertedBlockIDToReset,
)
s.state.SetL1Current(l1CurrentToReset)
s.lastInsertedBlockID = lastInsertedBlockIDToReset

return fmt.Errorf("reorg detected, reset l1Current cursor to %d", l1CurrentToReset.Number)
}
}

// Ignore those already inserted blocks.
if event.Id.Cmp(common.Big0) == 0 || (s.lastInsertedBlockID != nil && event.Id.Cmp(s.lastInsertedBlockID) <= 0) {
if s.lastInsertedBlockID != nil && event.Id.Cmp(s.lastInsertedBlockID) <= 0 {
return nil
}

Expand All @@ -117,11 +147,6 @@ func (s *Syncer) onBlockProposed(
"Removed", event.Raw.Removed,
)

// handle reorg
if event.Raw.Removed {
return s.handleReorg(ctx, event)
}

// Fetch the L2 parent block.
var (
parent *types.Header
Expand Down Expand Up @@ -229,108 +254,6 @@ func (s *Syncer) onBlockProposed(
return nil
}

// handleReorg detects reorg and rewinds the chain by 1 until we find a block that is still in the chain,
// then inserts that block as the new head.
func (s *Syncer) handleReorg(ctx context.Context, event *bindings.TaikoL1ClientBlockProposed) error {
log.Info(
"Reorg detected",
"L1Height", event.Raw.BlockNumber,
"L1Hash", event.Raw.BlockHash,
"BlockID", event.Id,
"Removed", event.Raw.Removed,
)

// rewind chain by 1 until we find a block that is still in the chain
var (
lastKnownGoodBlockID *big.Int
blockID *big.Int = s.lastInsertedBlockID
block *types.Block
err error
)

// if `lastInsertedBlockID` has not been set, we use current L2 chain head as blockID instead
if blockID == nil {
l2Head, err := s.rpc.L2.BlockByNumber(ctx, nil)
if err != nil {
return err
}
blockID = l2Head.Number()
}

stateVars, err := s.rpc.GetProtocolStateVariables(nil)
if err != nil {
return fmt.Errorf("failed to get state variables: %w", err)
}

for {
if blockID.Cmp(common.Big0) == 0 {
if block, err = s.rpc.L2.BlockByNumber(ctx, common.Big0); err != nil {
return err
}
lastKnownGoodBlockID = common.Big0
break
}

if block, err = s.rpc.L2.BlockByNumber(ctx, blockID); err != nil && !errors.Is(err, ethereum.NotFound) {
return err
}

if block != nil && blockID.Uint64() < stateVars.NumBlocks {
// block exists, we can rewind to this block
lastKnownGoodBlockID = blockID
break
} else {
// otherwise, sub 1 from blockId and try again
blockID = new(big.Int).Sub(blockID, common.Big1)
}
}

// shouldn't be able to reach this error because of the 0 check above
// but just in case
if lastKnownGoodBlockID == nil {
return fmt.Errorf("failed to find last known good block ID after reorg")
}

log.Info(
"🔗 Last known good block ID before reorg found",
"blockID", lastKnownGoodBlockID,
)

fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, &engine.ForkchoiceStateV1{HeadBlockHash: block.Hash()}, nil)
if err != nil {
return err
}
if fcRes.PayloadStatus.Status != engine.VALID {
return fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}

// reset l1 current to when the last known good block was inserted, and return the event.
if _, _, err := s.state.ResetL1Current(ctx, &state.HeightOrID{ID: lastKnownGoodBlockID}); err != nil {
return fmt.Errorf("failed to reset L1 current: %w", err)
}

log.Info(
"🔗 Rewound chain and inserted last known good block as new head",
"blockID", event.Id,
"height", block.Number(),
"hash", block.Hash(),
"latestVerifiedBlockHeight", s.state.GetLatestVerifiedBlock().Height,
"latestVerifiedBlockHash", s.state.GetLatestVerifiedBlock().Hash,
"transactions", len(block.Transactions()),
"baseFee", block.BaseFee(),
"withdrawals", len(block.Withdrawals()),
)

metrics.DriverL1CurrentHeightGauge.Update(int64(event.Raw.BlockNumber))
s.lastInsertedBlockID = block.Number()

if s.progressTracker.Triggered() {
s.progressTracker.ClearMeta()
}

return nil
}

// insertNewHead tries to insert a new head block to the L2 execution engine's local
// block chain through Engine APIs.
func (s *Syncer) insertNewHead(
Expand Down
45 changes: 0 additions & 45 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/suite"
"github.com/taikoxyz/taiko-client/bindings"
Expand Down Expand Up @@ -103,50 +102,6 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
s.Nil(payloadErr)
}

func (s *CalldataSyncerTestSuite) TestHandleReorgToGenesis() {
testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)

l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head1.NumberU64(), uint64(0))
s.NotZero(s.s.lastInsertedBlockID.Uint64())
s.s.lastInsertedBlockID = common.Big0 // let the chain reorg to genesis

s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{
Id: l2Head1.Number(),
Raw: types.Log{Removed: true},
}))

l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Equal(uint64(0), l2Head2.NumberU64())
}

func (s *CalldataSyncerTestSuite) TestHandleReorgToNoneGenesis() {
testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)

l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head1.NumberU64(), uint64(0))
s.NotZero(s.s.lastInsertedBlockID.Uint64())
s.s.lastInsertedBlockID = common.Big1 // let the chain reorg to height 1

s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{
Id: l2Head1.Number(),
Raw: types.Log{Removed: true},
}))

l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Equal(uint64(1), l2Head2.NumberU64())

testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)
l2Head3, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head3.NumberU64(), l2Head2.NumberU64())
s.Greater(s.s.lastInsertedBlockID.Uint64(), uint64(1))
}

func (s *CalldataSyncerTestSuite) TestTreasuryIncomeAllAnchors() {
treasury := common.HexToAddress(os.Getenv("TREASURY"))
s.NotZero(treasury.Big().Uint64())
Expand Down
50 changes: 9 additions & 41 deletions pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

const (
DefaultBlocksReadPerEpoch = 1000
DefaultReorgRewindDepth = 20
)

var (
Expand All @@ -29,19 +28,15 @@ type OnBlocksFunc func(
ctx context.Context,
start, end *types.Header,
updateCurrentFunc UpdateCurrentFunc,
onReorgFunc OnReorgFunc,
endIterFunc EndIterFunc,
) (bool, error)
) error

// UpdateCurrentFunc updates the iterator.current cursor in the iterator.
type UpdateCurrentFunc func(*types.Header)

// EndIterFunc ends the current iteration.
type EndIterFunc func()

// OnReorgFunc handles a reorganization from the source chain.
type OnReorgFunc func() error

// BlockBatchIterator iterates the blocks in batches between the given start and end heights,
// with the awareness of reorganization.
type BlockBatchIterator struct {
Expand All @@ -56,7 +51,6 @@ type BlockBatchIterator struct {
isEnd bool
reverse bool
reorgRewindDepth uint64
onReorg OnReorgFunc
}

// BlockBatchIteratorConfig represents the configs of a block batch iterator.
Expand All @@ -68,7 +62,6 @@ type BlockBatchIteratorConfig struct {
OnBlocks OnBlocksFunc
Reverse bool
ReorgRewindDepth *uint64
OnReorg OnReorgFunc
}

// NewBlockBatchIterator creates a new block batch iterator instance.
Expand Down Expand Up @@ -108,27 +101,13 @@ func NewBlockBatchIterator(ctx context.Context, cfg *BlockBatchIteratorConfig) (
}
}

var reorgRewindDepth uint64
if cfg.ReorgRewindDepth != nil {
reorgRewindDepth = *cfg.ReorgRewindDepth
} else {
reorgRewindDepth = DefaultReorgRewindDepth
}

iterator := &BlockBatchIterator{
ctx: ctx,
client: cfg.Client,
chainID: chainID,
startHeight: cfg.StartHeight.Uint64(),
onBlocks: cfg.OnBlocks,
reverse: cfg.Reverse,
reorgRewindDepth: reorgRewindDepth,
}

if cfg.OnReorg != nil {
iterator.onReorg = cfg.OnReorg
} else {
iterator.onReorg = iterator.rewindOnReorgDetected
ctx: ctx,
client: cfg.Client,
chainID: chainID,
startHeight: cfg.StartHeight.Uint64(),
onBlocks: cfg.OnBlocks,
reverse: cfg.Reverse,
}

if cfg.Reverse {
Expand Down Expand Up @@ -227,16 +206,10 @@ func (i *BlockBatchIterator) iter() (err error) {
return err
}

reorged, err := i.onBlocks(i.ctx, i.current, endHeader, i.updateCurrent, i.onReorg, i.end)
if err != nil {
if err := i.onBlocks(i.ctx, i.current, endHeader, i.updateCurrent, i.end); err != nil {
return err
}

// if we reorged, we want to skip checking if we are at the end, and also skip updating i.current
if reorged {
return nil
}

if i.isEnd {
return io.EOF
}
Expand Down Expand Up @@ -280,15 +253,10 @@ func (i *BlockBatchIterator) reverseIter() (err error) {
return err
}

reorged, err := i.onBlocks(i.ctx, startHeader, i.current, i.updateCurrent, i.onReorg, i.end)
if err != nil {
if err := i.onBlocks(i.ctx, startHeader, i.current, i.updateCurrent, i.end); err != nil {
return err
}

if reorged {
return nil
}

i.current = startHeader

if !isLastEpoch && !i.isEnd {
Expand Down
Loading

0 comments on commit e76b03f

Please sign in to comment.