Skip to content

Commit

Permalink
fix(driver): handle reorg (taikoxyz#216)
Browse files Browse the repository at this point in the history
Co-authored-by: David <[email protected]>
Co-authored-by: Roger <[email protected]>
  • Loading branch information
3 people authored May 16, 2023
1 parent c80f7e6 commit e8ff8c9
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 62 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lint:
test:
@TAIKO_MONO_DIR=${TAIKO_MONO_DIR} \
COMPILE_PROTOCOL=${COMPILE_PROTOCOL} \
PACKAGE=${PACKAGE} \
RUN_TESTS=true \
./integration_test/entrypoint.sh

Expand Down
109 changes: 109 additions & 0 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -114,8 +115,14 @@ func (s *Syncer) onBlockProposed(
"L1Height", event.Raw.BlockNumber,
"L1Hash", event.Raw.BlockHash,
"BlockID", event.Id,
"Removed", event.Raw.Removed,
)

// handle reorg
if event.Raw.Removed {
return s.handleReorg(ctx, event)
}

// Fetch the L2 parent block.
var (
parent *types.Header
Expand Down Expand Up @@ -223,6 +230,108 @@ func (s *Syncer) onBlockProposed(
return nil
}

// handleReorg detects reorg and rewinds the chain by 1 until we find a block that is still in the chain,
// then inserts that block as the new head.
func (s *Syncer) handleReorg(ctx context.Context, event *bindings.TaikoL1ClientBlockProposed) error {
log.Info(
"Reorg detected",
"L1Height", event.Raw.BlockNumber,
"L1Hash", event.Raw.BlockHash,
"BlockID", event.Id,
"Removed", event.Raw.Removed,
)

// rewind chain by 1 until we find a block that is still in the chain
var (
lastKnownGoodBlockID *big.Int
blockID *big.Int = s.lastInsertedBlockID
block *types.Block
err error
)

// if `lastInsertedBlockID` has not been set, we use current L2 chain head as blockID instead
if blockID == nil {
l2Head, err := s.rpc.L2.BlockByNumber(ctx, nil)
if err != nil {
return err
}
blockID = l2Head.Number()
}

stateVars, err := s.rpc.GetProtocolStateVariables(nil)
if err != nil {
return fmt.Errorf("failed to get state variables: %w", err)
}

for {
if blockID.Cmp(common.Big0) == 0 {
if block, err = s.rpc.L2.BlockByNumber(ctx, common.Big0); err != nil {
return err
}
lastKnownGoodBlockID = common.Big0
break
}

if block, err = s.rpc.L2.BlockByNumber(ctx, blockID); err != nil && !errors.Is(err, ethereum.NotFound) {
return err
}

if block != nil && blockID.Uint64() < stateVars.NumBlocks {
// block exists, we can rewind to this block
lastKnownGoodBlockID = blockID
break
} else {
// otherwise, sub 1 from blockId and try again
blockID = new(big.Int).Sub(blockID, common.Big1)
}
}

// shouldn't be able to reach this error because of the 0 check above
// but just in case
if lastKnownGoodBlockID == nil {
return fmt.Errorf("failed to find last known good block ID after reorg")
}

log.Info(
"🔗 Last known good block ID before reorg found",
"blockID", lastKnownGoodBlockID,
)

fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, &engine.ForkchoiceStateV1{HeadBlockHash: block.Hash()}, nil)
if err != nil {
return err
}
if fcRes.PayloadStatus.Status != engine.VALID {
return fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}

// reset l1 current to when the last known good block was inserted, and return the event.
if _, _, err := s.state.ResetL1Current(ctx, &state.HeightOrID{ID: lastKnownGoodBlockID}); err != nil {
return fmt.Errorf("failed to reset L1 current: %w", err)
}

log.Info(
"🔗 Rewound chain and inserted last known good block as new head",
"blockID", event.Id,
"height", block.Number(),
"hash", block.Hash(),
"latestVerifiedBlockHeight", s.state.GetLatestVerifiedBlock().Height,
"latestVerifiedBlockHash", s.state.GetLatestVerifiedBlock().Hash,
"transactions", len(block.Transactions()),
"baseFee", block.BaseFee(),
"withdrawals", len(block.Withdrawals()),
)

metrics.DriverL1CurrentHeightGauge.Update(int64(event.Raw.BlockNumber))
s.lastInsertedBlockID = block.Number()

if s.progressTracker.Triggered() {
s.progressTracker.ClearMeta()
}

return nil
}

// insertNewHead tries to insert a new head block to the L2 execution engine's local
// block chain through Engine APIs.
func (s *Syncer) insertNewHead(
Expand Down
64 changes: 64 additions & 0 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/suite"
"github.com/taikoxyz/taiko-client/bindings"
"github.com/taikoxyz/taiko-client/driver/chain_syncer/beaconsync"
"github.com/taikoxyz/taiko-client/driver/state"
"github.com/taikoxyz/taiko-client/proposer"
"github.com/taikoxyz/taiko-client/testutils"
)

type CalldataSyncerTestSuite struct {
testutils.ClientTestSuite
s *Syncer
p testutils.Proposer
}

func (s *CalldataSyncerTestSuite) SetupTest() {
Expand All @@ -36,6 +40,22 @@ func (s *CalldataSyncerTestSuite) SetupTest() {
)
s.Nil(err)
s.s = syncer

prop := new(proposer.Proposer)
l1ProposerPrivKey, err := crypto.ToECDSA(common.Hex2Bytes(os.Getenv("L1_PROPOSER_PRIVATE_KEY")))
s.Nil(err)
proposeInterval := 1024 * time.Hour // No need to periodically propose transactions list in unit tests
s.Nil(proposer.InitFromConfig(context.Background(), prop, (&proposer.Config{
L1Endpoint: os.Getenv("L1_NODE_WS_ENDPOINT"),
L2Endpoint: os.Getenv("L2_EXECUTION_ENGINE_WS_ENDPOINT"),
TaikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1_ADDRESS")),
TaikoL2Address: common.HexToAddress(os.Getenv("TAIKO_L2_ADDRESS")),
L1ProposerPrivKey: l1ProposerPrivKey,
L2SuggestedFeeRecipient: common.HexToAddress(os.Getenv("L2_SUGGESTED_FEE_RECIPIENT")),
ProposeInterval: &proposeInterval,
})))

s.p = prop
}

func (s *CalldataSyncerTestSuite) TestProcessL1Blocks() {
Expand Down Expand Up @@ -82,6 +102,50 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
s.Nil(payloadErr)
}

func (s *CalldataSyncerTestSuite) TestHandleReorgToGenesis() {
testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)

l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head1.NumberU64(), uint64(0))
s.NotZero(s.s.lastInsertedBlockID.Uint64())
s.s.lastInsertedBlockID = common.Big0 // let the chain reorg to genesis

s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{
Id: l2Head1.Number(),
Raw: types.Log{Removed: true},
}))

l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Equal(uint64(0), l2Head2.NumberU64())
}

func (s *CalldataSyncerTestSuite) TestHandleReorgToNoneGenesis() {
testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)

l2Head1, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head1.NumberU64(), uint64(0))
s.NotZero(s.s.lastInsertedBlockID.Uint64())
s.s.lastInsertedBlockID = common.Big1 // let the chain reorg to height 1

s.Nil(s.s.handleReorg(context.Background(), &bindings.TaikoL1ClientBlockProposed{
Id: l2Head1.Number(),
Raw: types.Log{Removed: true},
}))

l2Head2, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Equal(uint64(1), l2Head2.NumberU64())

testutils.ProposeAndInsertEmptyBlocks(&s.ClientTestSuite, s.p, s.s)
l2Head3, err := s.s.rpc.L2.BlockByNumber(context.Background(), nil)
s.Nil(err)
s.Greater(l2Head3.NumberU64(), l2Head2.NumberU64())
s.Greater(s.s.lastInsertedBlockID.Uint64(), uint64(1))
}

func TestCalldataSyncerTestSuite(t *testing.T) {
suite.Run(t, new(CalldataSyncerTestSuite))
}
2 changes: 1 addition & 1 deletion driver/chain_syncer/chain_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *L2ChainSyncer) Sync(l1End *types.Header) error {
}

// Reset the L1Current cursor.
blockID, err := s.state.ResetL1Current(s.ctx, heightOrID)
_, blockID, err := s.state.ResetL1Current(s.ctx, heightOrID)
if err != nil {
return err
}
Expand Down
41 changes: 22 additions & 19 deletions driver/state/l1_current.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,36 @@ func (s *State) SetL1Current(h *types.Header) {
}

// ResetL1Current resets the l1Current cursor to the L1 height which emitted a
// BlockProven event with given blockID / blockHash.
func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*big.Int, error) {
// BlockProposed event with given blockID / blockHash.
func (s *State) ResetL1Current(
ctx context.Context,
heightOrID *HeightOrID,
) (*bindings.TaikoL1ClientBlockProposed, *big.Int, error) {
if !heightOrID.NotEmpty() {
return nil, fmt.Errorf("empty input %v", heightOrID)
return nil, nil, fmt.Errorf("empty input %v", heightOrID)
}

log.Info("Reset L1 current cursor", "heightOrID", heightOrID)

var (
l1CurrentHeight *big.Int
err error
err error
)

if (heightOrID.ID != nil && heightOrID.ID.Cmp(common.Big0) == 0) ||
(heightOrID.Height != nil && heightOrID.Height.Cmp(common.Big0) == 0) {
l1Current, err := s.rpc.L1.HeaderByNumber(ctx, s.GenesisL1Height)
if err != nil {
return nil, err
return nil, nil, err
}
s.SetL1Current(l1Current)
return common.Big0, nil
return nil, common.Big0, nil
}

// Need to find the block ID at first, before filtering the BlockProposed events.
if heightOrID.ID == nil {
header, err := s.rpc.L2.HeaderByNumber(context.Background(), heightOrID.Height)
if err != nil {
return nil, err
return nil, nil, err
}
targetHash := header.Hash()

Expand Down Expand Up @@ -85,18 +87,19 @@ func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*bi
)

if err != nil {
return nil, err
return nil, nil, err
}

if err := iter.Iter(); err != nil {
return nil, err
return nil, nil, err
}

if heightOrID.ID == nil {
return nil, fmt.Errorf("BlockProven event not found, hash: %s", targetHash)
return nil, nil, fmt.Errorf("BlockProven event not found, hash: %s", targetHash)
}
}

var event *bindings.TaikoL1ClientBlockProposed
iter, err := eventIterator.NewBlockProposedIterator(
ctx,
&eventIterator.BlockProposedIteratorConfig{
Expand All @@ -111,32 +114,32 @@ func (s *State) ResetL1Current(ctx context.Context, heightOrID *HeightOrID) (*bi
e *bindings.TaikoL1ClientBlockProposed,
end eventIterator.EndBlockProposedEventIterFunc,
) error {
l1CurrentHeight = new(big.Int).SetUint64(e.Raw.BlockNumber)
event = e
end()
return nil
},
},
)

if err != nil {
return nil, err
return nil, nil, err
}

if err := iter.Iter(); err != nil {
return nil, err
return nil, nil, err
}

if l1CurrentHeight == nil {
return nil, fmt.Errorf("BlockProposed event not found, blockID: %s", heightOrID.ID)
if event == nil {
return nil, nil, fmt.Errorf("BlockProposed event not found, blockID: %s", heightOrID.ID)
}

l1Current, err := s.rpc.L1.HeaderByNumber(ctx, l1CurrentHeight)
l1Current, err := s.rpc.L1.HeaderByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber))
if err != nil {
return nil, err
return nil, nil, err
}
s.SetL1Current(l1Current)

log.Info("Reset L1 current cursor", "height", s.GetL1Current().Number)

return heightOrID.ID, nil
return event, heightOrID.ID, nil
}
6 changes: 3 additions & 3 deletions driver/state/l1_current_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ func (s *DriverStateTestSuite) TestSetL1Current() {
}

func (s *DriverStateTestSuite) TestResetL1CurrentEmptyHeight() {
l1Current, err := s.s.ResetL1Current(context.Background(), &HeightOrID{ID: common.Big0})
_, l1Current, err := s.s.ResetL1Current(context.Background(), &HeightOrID{ID: common.Big0})
s.Nil(err)
s.Zero(l1Current.Uint64())

_, err = s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big0})
_, _, err = s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big0})
s.Nil(err)
}

func (s *DriverStateTestSuite) TestResetL1CurrentEmptyID() {
_, err := s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big1})
_, _, err := s.s.ResetL1Current(context.Background(), &HeightOrID{Height: common.Big1})
s.NotNil(err)
}
2 changes: 1 addition & 1 deletion integration_test/nodes/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
- "0.0.0.0"

l2_execution_engine:
image: gcr.io/evmchain/taiko-geth:taiko
image: gcr.io/evmchain/taiko-geth:sha-f8be24a # TODO: change back to taiko tag
restart: unless-stopped
pull_policy: always
volumes:
Expand Down
Loading

0 comments on commit e8ff8c9

Please sign in to comment.