From e8ff8c90fad6843b3ae699375aea9d73240b5a1a Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Tue, 16 May 2023 11:01:00 -0700 Subject: [PATCH] fix(driver): handle reorg (#216) Co-authored-by: David Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com> --- Makefile | 1 + driver/chain_syncer/calldata/syncer.go | 109 ++++++++++++++++++ driver/chain_syncer/calldata/syncer_test.go | 64 ++++++++++ driver/chain_syncer/chain_syncer.go | 2 +- driver/state/l1_current.go | 41 ++++--- driver/state/l1_current_test.go | 6 +- integration_test/nodes/docker-compose.yml | 2 +- pkg/chain_iterator/block_batch_iterator.go | 78 ++++++++++--- .../block_batch_iterator_test.go | 70 ++++++++++- .../event_iterator/block_proposed_iterator.go | 16 +-- .../event_iterator/block_proven_iterator.go | 16 +-- 11 files changed, 343 insertions(+), 62 deletions(-) diff --git a/Makefile b/Makefile index 0af1b1c0b..eda456931 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ lint: test: @TAIKO_MONO_DIR=${TAIKO_MONO_DIR} \ COMPILE_PROTOCOL=${COMPILE_PROTOCOL} \ + PACKAGE=${PACKAGE} \ RUN_TESTS=true \ ./integration_test/entrypoint.sh diff --git a/driver/chain_syncer/calldata/syncer.go b/driver/chain_syncer/calldata/syncer.go index cc165d1ce..950845f1e 100644 --- a/driver/chain_syncer/calldata/syncer.go +++ b/driver/chain_syncer/calldata/syncer.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -114,8 +115,14 @@ func (s *Syncer) onBlockProposed( "L1Height", event.Raw.BlockNumber, "L1Hash", event.Raw.BlockHash, "BlockID", event.Id, + "Removed", event.Raw.Removed, ) + // handle reorg + if event.Raw.Removed { + return s.handleReorg(ctx, event) + } + // Fetch the L2 parent block. var ( parent *types.Header @@ -223,6 +230,108 @@ func (s *Syncer) onBlockProposed( return nil } +// handleReorg detects reorg and rewinds the chain by 1 until we find a block that is still in the chain, +// then inserts that block as the new head. +func (s *Syncer) handleReorg(ctx context.Context, event *bindings.TaikoL1ClientBlockProposed) error { + log.Info( + "Reorg detected", + "L1Height", event.Raw.BlockNumber, + "L1Hash", event.Raw.BlockHash, + "BlockID", event.Id, + "Removed", event.Raw.Removed, + ) + + // rewind chain by 1 until we find a block that is still in the chain + var ( + lastKnownGoodBlockID *big.Int + blockID *big.Int = s.lastInsertedBlockID + block *types.Block + err error + ) + + // if `lastInsertedBlockID` has not been set, we use current L2 chain head as blockID instead + if blockID == nil { + l2Head, err := s.rpc.L2.BlockByNumber(ctx, nil) + if err != nil { + return err + } + blockID = l2Head.Number() + } + + stateVars, err := s.rpc.GetProtocolStateVariables(nil) + if err != nil { + return fmt.Errorf("failed to get state variables: %w", err) + } + + for { + if blockID.Cmp(common.Big0) == 0 { + if block, err = s.rpc.L2.BlockByNumber(ctx, common.Big0); err != nil { + return err + } + lastKnownGoodBlockID = common.Big0 + break + } + + if block, err = s.rpc.L2.BlockByNumber(ctx, blockID); err != nil && !errors.Is(err, ethereum.NotFound) { + return err + } + + if block != nil && blockID.Uint64() < stateVars.NumBlocks { + // block exists, we can rewind to this block + lastKnownGoodBlockID = blockID + break + } else { + // otherwise, sub 1 from blockId and try again + blockID = new(big.Int).Sub(blockID, common.Big1) + } + } + + // shouldn't be able to reach this error because of the 0 check above + // but just in case + if lastKnownGoodBlockID == nil { + return fmt.Errorf("failed to find last known good block ID after reorg") + } + + log.Info( + "🔗 Last known good block ID before reorg found", + "blockID", lastKnownGoodBlockID, + ) + + fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, &engine.ForkchoiceStateV1{HeadBlockHash: block.Hash()}, nil) + if err != nil { + return err + } + if fcRes.PayloadStatus.Status != engine.VALID { + return fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status) + } + + // reset l1 current to when the last known good block was inserted, and return the event. + if _, _, err := s.state.ResetL1Current(ctx, &state.HeightOrID{ID: lastKnownGoodBlockID}); err != nil { + return fmt.Errorf("failed to reset L1 current: %w", err) + } + + log.Info( + "🔗 Rewound chain and inserted last known good block as new head", + "blockID", event.Id, + "height", block.Number(), + "hash", block.Hash(), + "latestVerifiedBlockHeight", s.state.GetLatestVerifiedBlock().Height, + "latestVerifiedBlockHash", s.state.GetLatestVerifiedBlock().Hash, + "transactions", len(block.Transactions()), + "baseFee", block.BaseFee(), + "withdrawals", len(block.Withdrawals()), + ) + + metrics.DriverL1CurrentHeightGauge.Update(int64(event.Raw.BlockNumber)) + s.lastInsertedBlockID = block.Number() + + if s.progressTracker.Triggered() { + s.progressTracker.ClearMeta() + } + + return nil +} + // insertNewHead tries to insert a new head block to the L2 execution engine's local // block chain through Engine APIs. func (s *Syncer) insertNewHead( diff --git a/driver/chain_syncer/calldata/syncer_test.go b/driver/chain_syncer/calldata/syncer_test.go index cd5aed1ef..920c62ea7 100644 --- a/driver/chain_syncer/calldata/syncer_test.go +++ b/driver/chain_syncer/calldata/syncer_test.go @@ -9,16 +9,20 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/suite" "github.com/taikoxyz/taiko-client/bindings" "github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync" "github.com/taikoxyz/taiko-client/driver/state" + "github.com/taikoxyz/taiko-client/proposer" "github.com/taikoxyz/taiko-client/testutils" ) type CalldataSyncerTestSuite struct { testutils.ClientTestSuite s *Syncer + p testutils.Proposer } func (s *CalldataSyncerTestSuite) SetupTest() { @@ -36,6 +40,22 @@ func (s *CalldataSyncerTestSuite) SetupTest() { ) s.Nil(err) s.s = syncer + + prop := new(proposer.Proposer) + l1ProposerPrivKey, err := crypto.ToECDSA(common.Hex2Bytes(os.Getenv("L1_PROPOSER_PRIVATE_KEY"))) + s.Nil(err) + proposeInterval := 1024 * time.Hour // No need to periodically propose transactions list in unit tests + s.Nil(proposer.InitFromConfig(context.Background(), prop, (&proposer.Config{ + L1Endpoint: os.Getenv("L1_NODE_WS_ENDPOINT"), + L2Endpoint: os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"), + TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1_ADDRESS")), + TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")), + L1ProposerPrivKey: l1ProposerPrivKey, + L2SuggestedFeeRecipient: common.HexToAddress(os.Getenv("L2_SUGGESTED_FEE_RECIPIENT")), + ProposeInterval: &proposeInterval, + }))) + + s.p = prop } func (s *CalldataSyncerTestSuite) TestProcessL1Blocks() { @@ -82,6 +102,50 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() { s.Nil(payloadErr) } +func (s *CalldataSyncerTestSuite) TestHandleReorgToGenesis() { + testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s) + + l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil) + s.Nil(err) + s.Greater(l2Head1.NumberU64(), uint64(0)) + s.NotZero(s.s.lastInsertedBlockID.Uint64()) + s.s.lastInsertedBlockID = common.Big0 // let the chain reorg to genesis + + s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{ + Id: l2Head1.Number(), + Raw: types.Log{Removed: true}, + })) + + l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil) + s.Nil(err) + s.Equal(uint64(0), l2Head2.NumberU64()) +} + +func (s *CalldataSyncerTestSuite) TestHandleReorgToNoneGenesis() { + testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s) + + l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil) + s.Nil(err) + s.Greater(l2Head1.NumberU64(), uint64(0)) + s.NotZero(s.s.lastInsertedBlockID.Uint64()) + s.s.lastInsertedBlockID = common.Big1 // let the chain reorg to height 1 + + s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{ + Id: l2Head1.Number(), + Raw: types.Log{Removed: true}, + })) + + l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil) + s.Nil(err) + s.Equal(uint64(1), l2Head2.NumberU64()) + + testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s) + l2Head3, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil) + s.Nil(err) + s.Greater(l2Head3.NumberU64(), l2Head2.NumberU64()) + s.Greater(s.s.lastInsertedBlockID.Uint64(), uint64(1)) +} + func TestCalldataSyncerTestSuite(t *testing.T) { suite.Run(t, new(CalldataSyncerTestSuite)) } diff --git a/driver/chain_syncer/chain_syncer.go b/driver/chain_syncer/chain_syncer.go index 268198970..798299433 100644 --- a/driver/chain_syncer/chain_syncer.go +++ b/driver/chain_syncer/chain_syncer.go @@ -119,7 +119,7 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error { } // Reset the L1Current cursor. - blockID, err := s.state.ResetL1Current(s.ctx, heightOrID) + _, blockID, err := s.state.ResetL1Current(s.ctx, heightOrID) if err != nil { return err } diff --git a/driver/state/l1_current.go b/driver/state/l1_current.go index 5cf3f7956..e9eb90f94 100644 --- a/driver/state/l1_current.go +++ b/driver/state/l1_current.go @@ -28,34 +28,36 @@ func (s *State) SetL1Current(h *types.Header) { } // ResetL1Current resets the l1Current cursor to the L1 height which emitted a -// BlockProven event with given blockID / blockHash. -func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*big.Int, error) { +// BlockProposed event with given blockID / blockHash. +func (s *State) ResetL1Current( + ctx context.Context, + heightOrID *HeightOrID, +) (*bindings.TaikoL1ClientBlockProposed, *big.Int, error) { if !heightOrID.NotEmpty() { - return nil, fmt.Errorf("empty input %v", heightOrID) + return nil, nil, fmt.Errorf("empty input %v", heightOrID) } log.Info("Reset L1 current cursor", "heightOrID", heightOrID) var ( - l1CurrentHeight *big.Int - err error + err error ) if (heightOrID.ID != nil && heightOrID.ID.Cmp(common.Big0) == 0) || (heightOrID.Height != nil && heightOrID.Height.Cmp(common.Big0) == 0) { l1Current, err := s.rpc.L1.HeaderByNumber(ctx, s.GenesisL1Height) if err != nil { - return nil, err + return nil, nil, err } s.SetL1Current(l1Current) - return common.Big0, nil + return nil, common.Big0, nil } // Need to find the block ID at first, before filtering the BlockProposed events. if heightOrID.ID == nil { header, err := s.rpc.L2.HeaderByNumber(context.Background(), heightOrID.Height) if err != nil { - return nil, err + return nil, nil, err } targetHash := header.Hash() @@ -85,18 +87,19 @@ func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*bi ) if err != nil { - return nil, err + return nil, nil, err } if err := iter.Iter(); err != nil { - return nil, err + return nil, nil, err } if heightOrID.ID == nil { - return nil, fmt.Errorf("BlockProven event not found, hash: %s", targetHash) + return nil, nil, fmt.Errorf("BlockProven event not found, hash: %s", targetHash) } } + var event *bindings.TaikoL1ClientBlockProposed iter, err := eventIterator.NewBlockProposedIterator( ctx, &eventIterator.BlockProposedIteratorConfig{ @@ -111,7 +114,7 @@ func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*bi e *bindings.TaikoL1ClientBlockProposed, end eventIterator.EndBlockProposedEventIterFunc, ) error { - l1CurrentHeight = new(big.Int).SetUint64(e.Raw.BlockNumber) + event = e end() return nil }, @@ -119,24 +122,24 @@ func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*bi ) if err != nil { - return nil, err + return nil, nil, err } if err := iter.Iter(); err != nil { - return nil, err + return nil, nil, err } - if l1CurrentHeight == nil { - return nil, fmt.Errorf("BlockProposed event not found, blockID: %s", heightOrID.ID) + if event == nil { + return nil, nil, fmt.Errorf("BlockProposed event not found, blockID: %s", heightOrID.ID) } - l1Current, err := s.rpc.L1.HeaderByNumber(ctx, l1CurrentHeight) + l1Current, err := s.rpc.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber)) if err != nil { - return nil, err + return nil, nil, err } s.SetL1Current(l1Current) log.Info("Reset L1 current cursor", "height", s.GetL1Current().Number) - return heightOrID.ID, nil + return event, heightOrID.ID, nil } diff --git a/driver/state/l1_current_test.go b/driver/state/l1_current_test.go index 31367cb3e..86b26b60d 100644 --- a/driver/state/l1_current_test.go +++ b/driver/state/l1_current_test.go @@ -19,15 +19,15 @@ func (s *DriverStateTestSuite) TestSetL1Current() { } func (s *DriverStateTestSuite) TestResetL1CurrentEmptyHeight() { - l1Current, err := s.s.ResetL1Current(context.Background(), &HeightOrID{ID: common.Big0}) + _, l1Current, err := s.s.ResetL1Current(context.Background(), &HeightOrID{ID: common.Big0}) s.Nil(err) s.Zero(l1Current.Uint64()) - _, err = s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big0}) + _, _, err = s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big0}) s.Nil(err) } func (s *DriverStateTestSuite) TestResetL1CurrentEmptyID() { - _, err := s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big1}) + _, _, err := s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big1}) s.NotNil(err) } diff --git a/integration_test/nodes/docker-compose.yml b/integration_test/nodes/docker-compose.yml index 64a816b99..04f852f4b 100644 --- a/integration_test/nodes/docker-compose.yml +++ b/integration_test/nodes/docker-compose.yml @@ -14,7 +14,7 @@ services: - "0.0.0.0" l2_execution_engine: - image: gcr.io/evmchain/taiko-geth:taiko + image: gcr.io/evmchain/taiko-geth:sha-f8be24a # TODO: change back to taiko tag restart: unless-stopped pull_policy: always volumes: diff --git a/pkg/chain_iterator/block_batch_iterator.go b/pkg/chain_iterator/block_batch_iterator.go index 4479b4e7e..a7d15c311 100644 --- a/pkg/chain_iterator/block_batch_iterator.go +++ b/pkg/chain_iterator/block_batch_iterator.go @@ -16,7 +16,7 @@ import ( const ( DefaultBlocksReadPerEpoch = 1000 - ReorgRewindDepth = 20 + DefaultReorgRewindDepth = 20 ) var ( @@ -24,13 +24,14 @@ var ( ) // OnBlocksFunc represents the callback function which will be called when a batch of blocks in chain are -// iterated. +// iterated. It returns true if it reorged, and false if not. type OnBlocksFunc func( ctx context.Context, start, end *types.Header, updateCurrentFunc UpdateCurrentFunc, + onReorgFunc OnReorgFunc, endIterFunc EndIterFunc, -) error +) (bool, error) // UpdateCurrentFunc updates the iterator.current cursor in the iterator. type UpdateCurrentFunc func(*types.Header) @@ -38,6 +39,9 @@ type UpdateCurrentFunc func(*types.Header) // EndIterFunc ends the current iteration. type EndIterFunc func() +// OnReorgFunc handles a reorganization from the source chain. +type OnReorgFunc func() error + // BlockBatchIterator iterates the blocks in batches between the given start and end heights, // with the awareness of reorganization. type BlockBatchIterator struct { @@ -51,6 +55,8 @@ type BlockBatchIterator struct { onBlocks OnBlocksFunc isEnd bool reverse bool + reorgRewindDepth uint64 + onReorg OnReorgFunc } // BlockBatchIteratorConfig represents the configs of a block batch iterator. @@ -61,6 +67,8 @@ type BlockBatchIteratorConfig struct { EndHeight *big.Int OnBlocks OnBlocksFunc Reverse bool + ReorgRewindDepth *uint64 + OnReorg OnReorgFunc } // NewBlockBatchIterator creates a new block batch iterator instance. @@ -100,13 +108,27 @@ func NewBlockBatchIterator(ctx context.Context, cfg *BlockBatchIteratorConfig) ( } } + var reorgRewindDepth uint64 + if cfg.ReorgRewindDepth != nil { + reorgRewindDepth = *cfg.ReorgRewindDepth + } else { + reorgRewindDepth = DefaultReorgRewindDepth + } + iterator := &BlockBatchIterator{ - ctx: ctx, - client: cfg.Client, - chainID: chainID, - startHeight: cfg.StartHeight.Uint64(), - onBlocks: cfg.OnBlocks, - reverse: cfg.Reverse, + ctx: ctx, + client: cfg.Client, + chainID: chainID, + startHeight: cfg.StartHeight.Uint64(), + onBlocks: cfg.OnBlocks, + reverse: cfg.Reverse, + reorgRewindDepth: reorgRewindDepth, + } + + if cfg.OnReorg != nil { + iterator.onReorg = cfg.OnReorg + } else { + iterator.onReorg = iterator.rewindOnReorgDetected } if cfg.Reverse { @@ -205,10 +227,16 @@ func (i *BlockBatchIterator) iter() (err error) { return err } - if err := i.onBlocks(i.ctx, i.current, endHeader, i.updateCurrent, i.end); err != nil { + reorged, err := i.onBlocks(i.ctx, i.current, endHeader, i.updateCurrent, i.onReorg, i.end) + if err != nil { return err } + // if we reorged, we want to skip checking if we are at the end, and also skip updating i.current + if reorged { + return nil + } + if i.isEnd { return io.EOF } @@ -252,10 +280,15 @@ func (i *BlockBatchIterator) reverseIter() (err error) { return err } - if err := i.onBlocks(i.ctx, startHeader, i.current, i.updateCurrent, i.end); err != nil { + reorged, err := i.onBlocks(i.ctx, startHeader, i.current, i.updateCurrent, i.onReorg, i.end) + if err != nil { return err } + if reorged { + return nil + } + i.current = startHeader if !isLastEpoch && !i.isEnd { @@ -282,6 +315,8 @@ func (i *BlockBatchIterator) end() { // ensureCurrentNotReorged checks if the iterator.current cursor was reorged, if was, will // rewind back `ReorgRewindDepth` blocks. +// reorg is also detected on the iteration of the event later, by checking +// event.Raw.Removed, which will also call `i.rewindOnReorgDetected` to rewind back func (i *BlockBatchIterator) ensureCurrentNotReorged() error { current, err := i.client.HeaderByHash(i.ctx, i.current.Hash()) if err != nil && !errors.Is(err, ethereum.NotFound) { @@ -293,14 +328,25 @@ func (i *BlockBatchIterator) ensureCurrentNotReorged() error { return nil } - // Reorg detected, rewind back `ReorgRewindDepth` blocks + // reorged + return i.rewindOnReorgDetected() +} + +// rewindOnReorgDetected rewinds back `ReorgRewindDepth` blocks and sets i.current +// to a stable block, or 0 if it's less than `ReorgRewindDepth`. +func (i *BlockBatchIterator) rewindOnReorgDetected() error { var newCurrentHeight uint64 - if i.current.Number.Uint64() <= ReorgRewindDepth { + if i.current.Number.Uint64() <= i.reorgRewindDepth { newCurrentHeight = 0 } else { - newCurrentHeight = i.current.Number.Uint64() - ReorgRewindDepth + newCurrentHeight = i.current.Number.Uint64() - i.reorgRewindDepth } - i.current, err = i.client.HeaderByNumber(i.ctx, new(big.Int).SetUint64(newCurrentHeight)) - return err + current, err := i.client.HeaderByNumber(i.ctx, new(big.Int).SetUint64(newCurrentHeight)) + if err != nil { + return err + } + + i.current = current + return nil } diff --git a/pkg/chain_iterator/block_batch_iterator_test.go b/pkg/chain_iterator/block_batch_iterator_test.go index fd809936d..ebb243cae 100644 --- a/pkg/chain_iterator/block_batch_iterator_test.go +++ b/pkg/chain_iterator/block_batch_iterator_test.go @@ -33,11 +33,12 @@ func (s *BlockBatchIteratorTestSuite) TestIter() { ctx context.Context, start, end *types.Header, updateCurrentFunc UpdateCurrentFunc, + onReorgFunc OnReorgFunc, endIterFunc EndIterFunc, - ) error { + ) (bool, error) { s.Equal(lastEnd.Uint64(), start.Number.Uint64()) lastEnd = end.Number - return nil + return false, nil }, }) @@ -68,11 +69,12 @@ func (s *BlockBatchIteratorTestSuite) TestIterReverse() { ctx context.Context, start, end *types.Header, updateCurrentFunc UpdateCurrentFunc, + onReorgFunc OnReorgFunc, endIterFunc EndIterFunc, - ) error { + ) (bool, error) { s.Equal(lastStart.Uint64(), end.Number.Uint64()) lastStart = start.Number - return nil + return false, nil }, }) @@ -99,12 +101,13 @@ func (s *BlockBatchIteratorTestSuite) TestIterEndFunc() { ctx context.Context, start, end *types.Header, updateCurrentFunc UpdateCurrentFunc, + onReorgFunc OnReorgFunc, endIterFunc EndIterFunc, - ) error { + ) (bool, error) { s.Equal(lastEnd.Uint64(), start.Number.Uint64()) lastEnd = end.Number endIterFunc() - return nil + return false, nil }, }) @@ -113,6 +116,61 @@ func (s *BlockBatchIteratorTestSuite) TestIterEndFunc() { s.Equal(lastEnd.Uint64(), maxBlocksReadPerEpoch) } +func (s *BlockBatchIteratorTestSuite) TestIter_ReorgEncountered() { + var maxBlocksReadPerEpoch uint64 = 1 + var reorgRewindDepth uint64 = 1 + var rewindEveryNBlocks uint64 = 2 + var lastBlockReorged bool = false + + headHeight, err := s.RpcClient.L1.BlockNumber(context.Background()) + s.Nil(err) + s.Greater(headHeight, uint64(0)) + + lastEnd := common.Big0 + + iter, err := NewBlockBatchIterator(context.Background(), &BlockBatchIteratorConfig{ + Client: s.RpcClient.L1, + MaxBlocksReadPerEpoch: &maxBlocksReadPerEpoch, + StartHeight: common.Big0, + EndHeight: new(big.Int).SetUint64(headHeight), + ReorgRewindDepth: &reorgRewindDepth, + OnReorg: func() error { + if lastEnd.Uint64() < reorgRewindDepth { + lastEnd = common.Big0 + } else { + lastEnd = new(big.Int).Sub(lastEnd, new(big.Int).SetUint64(reorgRewindDepth)) + } + lastBlockReorged = true + return nil + }, + OnBlocks: func( + ctx context.Context, + start, end *types.Header, + updateCurrentFunc UpdateCurrentFunc, + onReorgFunc OnReorgFunc, + endIterFunc EndIterFunc, + ) (bool, error) { + // reorg every 2 blocks but not the first block + if lastEnd != common.Big0 && lastEnd.Uint64()%rewindEveryNBlocks == 0 { + return true, onReorgFunc() + } + + if lastBlockReorged { + s.Equal(start.Number.Uint64(), lastEnd.Uint64()+reorgRewindDepth) + } else { + s.Equal(start.Number.Uint64(), lastEnd.Uint64()) + } + + lastEnd = end.Number + lastBlockReorged = false + return false, nil + }, + }) + + s.Nil(err) + s.Nil(iter.Iter()) +} + func TestBlockBatchIteratorTestSuite(t *testing.T) { suite.Run(t, new(BlockBatchIteratorTestSuite)) } diff --git a/pkg/chain_iterator/event_iterator/block_proposed_iterator.go b/pkg/chain_iterator/event_iterator/block_proposed_iterator.go index 6928f09a3..ff7b2bd69 100644 --- a/pkg/chain_iterator/event_iterator/block_proposed_iterator.go +++ b/pkg/chain_iterator/event_iterator/block_proposed_iterator.go @@ -105,43 +105,43 @@ func assembleBlockProposedIteratorCallback( ctx context.Context, start, end *types.Header, updateCurrentFunc chainIterator.UpdateCurrentFunc, + onReorgFunc chainIterator.OnReorgFunc, endFunc chainIterator.EndIterFunc, - ) error { + ) (bool, error) { endHeight := end.Number.Uint64() iter, err := taikoL1Client.FilterBlockProposed( &bind.FilterOpts{Start: start.Number.Uint64(), End: &endHeight, Context: ctx}, filterQuery, ) if err != nil { - return err + return false, err } defer iter.Close() for iter.Next() { event := iter.Event - // Skip if reorged. if event.Raw.Removed { - continue + return true, onReorgFunc() } if err := callback(ctx, event, eventIter.end); err != nil { - return err + return false, err } if eventIter.isEnd { endFunc() - return nil + return false, nil } current, err := client.HeaderByHash(ctx, event.Raw.BlockHash) if err != nil { - return err + return false, err } updateCurrentFunc(current) } - return nil + return false, nil } } diff --git a/pkg/chain_iterator/event_iterator/block_proven_iterator.go b/pkg/chain_iterator/event_iterator/block_proven_iterator.go index 39d8bf1fd..43b4f5f12 100644 --- a/pkg/chain_iterator/event_iterator/block_proven_iterator.go +++ b/pkg/chain_iterator/event_iterator/block_proven_iterator.go @@ -101,43 +101,43 @@ func assembleBlockProvenIteratorCallback( ctx context.Context, start, end *types.Header, updateCurrentFunc chainIterator.UpdateCurrentFunc, + onReorgFunc chainIterator.OnReorgFunc, endFunc chainIterator.EndIterFunc, - ) error { + ) (bool, error) { endHeight := end.Number.Uint64() iter, err := taikoL1Client.FilterBlockProven( &bind.FilterOpts{Start: start.Number.Uint64(), End: &endHeight, Context: ctx}, filterQuery, ) if err != nil { - return err + return false, err } defer iter.Close() for iter.Next() { event := iter.Event - // Skip if reorged. if event.Raw.Removed { - continue + return true, onReorgFunc() } if err := callback(ctx, event, eventIter.end); err != nil { - return err + return false, err } if eventIter.isEnd { endFunc() - return nil + return false, nil } current, err := client.HeaderByHash(ctx, event.Raw.BlockHash) if err != nil { - return err + return false, err } updateCurrentFunc(current) } - return nil + return false, nil } }