diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 95abcb46a7fa..6b936c112d34 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -217,3 +217,7 @@ func (c *channel) OldestL2() eth.BlockID { func (c *channel) Close() { c.channelBuilder.Close() } + +func (c *channel) MaxInclusionBlock() uint64 { + return c.maxInclusionBlock +} diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1da2def78da6..b5cfc6ebc1bc 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -464,78 +464,30 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager") -// pruneSafeBlocks dequeues blocks from the internal blocks queue -// if they have now become safe. -func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) { - oldestBlock, ok := s.blocks.Peek() +// pruneSafeBlocks dequeues the provided number of blocks from the internal blocks queue +func (s *channelManager) pruneSafeBlocks(num int) { + _, ok := s.blocks.DequeueN(int(num)) if !ok { - // no blocks to prune - return + panic("tried to prune more blocks than available") } - - if newSafeHead.Number+1 == oldestBlock.NumberU64() { - // no blocks to prune - return - } - - if newSafeHead.Number+1 < oldestBlock.NumberU64() { - // This could happen if there was an L1 reorg. - // Or if the sequencer restarted. - s.log.Warn("safe head reversed, clearing channel manager state", - "oldestBlock", eth.ToBlockID(oldestBlock), - "newSafeBlock", newSafeHead) - // We should restart work from the new safe head, - // and therefore prune all the blocks. - s.Clear(newSafeHead.L1Origin) - return - } - - numBlocksToDequeue := newSafeHead.Number + 1 - oldestBlock.NumberU64() - - if numBlocksToDequeue > uint64(s.blocks.Len()) { - // This could happen if the batcher restarted. - // The sequencer may have derived the safe chain - // from channels sent by a previous batcher instance. - s.log.Warn("safe head above unsafe head, clearing channel manager state", - "unsafeBlock", eth.ToBlockID(s.blocks[s.blocks.Len()-1]), - "newSafeBlock", newSafeHead) - // We should restart work from the new safe head, - // and therefore prune all the blocks. - s.Clear(newSafeHead.L1Origin) - return - } - - if s.blocks[numBlocksToDequeue-1].Hash() != newSafeHead.Hash { - s.log.Warn("safe chain reorg, clearing channel manager state", - "existingBlock", eth.ToBlockID(s.blocks[numBlocksToDequeue-1]), - "newSafeBlock", newSafeHead) - // We should restart work from the new safe head, - // and therefore prune all the blocks. - s.Clear(newSafeHead.L1Origin) - return - } - - // This shouldn't return an error because - // We already checked numBlocksToDequeue <= s.blocks.Len() - _, _ = s.blocks.DequeueN(int(numBlocksToDequeue)) - s.blockCursor -= int(numBlocksToDequeue) - + s.blockCursor -= int(num) if s.blockCursor < 0 { - panic("negative blockCursor") + s.blockCursor = 0 } } -// pruneChannels dequeues channels from the internal channels queue -// if they were built using blocks which are now safe -func (s *channelManager) pruneChannels(newSafeHead eth.L2BlockRef) { - i := 0 - for _, ch := range s.channelQueue { - if ch.LatestL2().Number > newSafeHead.Number { - break +// pruneChannels dequeues the provided number of channels from the internal channels queue +func (s *channelManager) pruneChannels(num int) { + clearCurrentChannel := false + for i := 0; i < num; i++ { + if s.channelQueue[i] == s.currentChannel { + clearCurrentChannel = true } - i++ } - s.channelQueue = s.channelQueue[i:] + s.channelQueue = s.channelQueue[num:] + if clearCurrentChannel { + s.currentChannel = nil + } } // PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2 diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 32aae1b06dd1..d1a0037a5d0d 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -463,14 +463,12 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) { } func TestChannelManager_PruneBlocks(t *testing.T) { - l := testlog.Logger(t, log.LevelDebug) cfg := channelManagerTestConfig(100, derive.SingularBatchType) - m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - + cfg.InitNoneCompressor() a := types.NewBlock(&types.Header{ Number: big.NewInt(0), }, nil, nil, nil) - b := types.NewBlock(&types.Header{ // This will shortly become the safe head + b := types.NewBlock(&types.Header{ Number: big.NewInt(1), ParentHash: a.Hash(), }, nil, nil, nil) @@ -479,132 +477,157 @@ func TestChannelManager_PruneBlocks(t *testing.T) { ParentHash: b.Hash(), }, nil, nil, nil) - require.NoError(t, m.AddL2Block(a)) - m.blockCursor += 1 - require.NoError(t, m.AddL2Block(b)) - m.blockCursor += 1 - require.NoError(t, m.AddL2Block(c)) - m.blockCursor += 1 - - // Normal path - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: b.Hash(), - Number: b.NumberU64(), - }) - require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks) - - // Safe chain didn't move, nothing to prune - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: b.Hash(), - Number: b.NumberU64(), - }) - require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks) - - // Safe chain moved beyond the blocks we had - // state should be cleared - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: c.Hash(), - Number: uint64(99), - }) - require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) - - // No blocks to prune, NOOP - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: c.Hash(), - Number: c.NumberU64(), - }) - require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) - - // Put another block in - d := types.NewBlock(&types.Header{ - Number: big.NewInt(3), - ParentHash: c.Hash(), - }, nil, nil, nil) - require.NoError(t, m.AddL2Block(d)) - m.blockCursor += 1 - - // Safe chain reorg - // state should be cleared - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: a.Hash(), - Number: uint64(3), - }) - require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) - - // Put another block in - require.NoError(t, m.AddL2Block(d)) - m.blockCursor += 1 + type testCase struct { + name string + initialQ queue.Queue[*types.Block] + initialBlockCursor int + numChannelsToPrune int + expectedQ queue.Queue[*types.Block] + expectedBlockCursor int + } - // Safe chain reversed - // state should be cleared - m.pruneSafeBlocks(eth.L2BlockRef{ - Hash: a.Hash(), // unused - Number: uint64(1), - }) - require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) + for _, tc := range []testCase{ + { + name: "[A,B,C]*+1->[B,C]*", // * denotes the cursor + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 3, + numChannelsToPrune: 1, + expectedQ: queue.Queue[*types.Block]{b, c}, + expectedBlockCursor: 2, + }, + { + name: "[A,B,C*]+1->[B,C*]", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 2, + numChannelsToPrune: 1, + expectedQ: queue.Queue[*types.Block]{b, c}, + expectedBlockCursor: 1, + }, + { + name: "[A,B,C]*+2->[C]*", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 3, + numChannelsToPrune: 2, + expectedQ: queue.Queue[*types.Block]{c}, + expectedBlockCursor: 1, + }, + { + name: "[A,B,C*]+2->[C*]", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 2, + numChannelsToPrune: 2, + expectedQ: queue.Queue[*types.Block]{c}, + expectedBlockCursor: 0, + }, + { + name: "[A*,B,C]+1->[B*,C]", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 0, + numChannelsToPrune: 1, + expectedQ: queue.Queue[*types.Block]{b, c}, + expectedBlockCursor: 0, + }, + { + name: "[A,B,C]+3->[]", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 3, + numChannelsToPrune: 3, + expectedQ: queue.Queue[*types.Block]{}, + expectedBlockCursor: 0, + }, + { + name: "[A,B,C]*+4->panic", + initialQ: queue.Queue[*types.Block]{a, b, c}, + initialBlockCursor: 3, + numChannelsToPrune: 4, + expectedQ: nil, // declare that the prune method should panic + expectedBlockCursor: 0, + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + m.blocks = tc.initialQ + m.blockCursor = tc.initialBlockCursor + if tc.expectedQ != nil { + m.pruneSafeBlocks(tc.numChannelsToPrune) + require.Equal(t, tc.expectedQ, m.blocks) + } else { + require.Panics(t, func() { m.pruneSafeBlocks(tc.numChannelsToPrune) }) + } + }) + } } func TestChannelManager_PruneChannels(t *testing.T) { - l := testlog.Logger(t, log.LevelCrit) cfg := channelManagerTestConfig(100, derive.SingularBatchType) - cfg.InitNoneCompressor() - m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - - A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) - require.NoError(t, err) - B, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) - require.NoError(t, err) - C, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) - require.NoError(t, err) - - m.channelQueue = []*channel{A, B, C} - - numTx := 1 - rng := rand.New(rand.NewSource(123)) - a0 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - a0 = a0.WithSeal(&types.Header{Number: big.NewInt(0)}) - a1 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - a1 = a1.WithSeal(&types.Header{Number: big.NewInt(1)}) - b2 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - b2 = b2.WithSeal(&types.Header{Number: big.NewInt(2)}) - b3 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - b3 = b3.WithSeal(&types.Header{Number: big.NewInt(3)}) - c4 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - c4 = c4.WithSeal(&types.Header{Number: big.NewInt(4)}) - - _, err = A.AddBlock(a0) - require.NoError(t, err) - _, err = A.AddBlock(a1) - require.NoError(t, err) - - _, err = B.AddBlock(b2) + A, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0) require.NoError(t, err) - _, err = B.AddBlock(b3) + B, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0) require.NoError(t, err) - - _, err = C.AddBlock(c4) + C, err := newChannelWithChannelOut(nil, metrics.NoopMetrics, cfg, defaultTestRollupConfig, 0) require.NoError(t, err) - m.pruneChannels(eth.L2BlockRef{ - Number: uint64(3), - }) - - require.Equal(t, []*channel{C}, m.channelQueue) - - m.pruneChannels(eth.L2BlockRef{ - Number: uint64(4), - }) - - require.Equal(t, []*channel{}, m.channelQueue) - - m.pruneChannels(eth.L2BlockRef{ - Number: uint64(4), - }) - - require.Equal(t, []*channel{}, m.channelQueue) + type testCase struct { + name string + initialQ []*channel + initialCurrentChannel *channel + numChannelsToPrune int + expectedQ []*channel + expectedCurrentChannel *channel + } + for _, tc := range []testCase{ + { + name: "[A,B,C]+1->[B,C]", + initialQ: []*channel{A, B, C}, + numChannelsToPrune: 1, + expectedQ: []*channel{B, C}, + }, + { + name: "[A,B,C]+3->[] + currentChannel=C", + initialQ: []*channel{A, B, C}, + initialCurrentChannel: C, + numChannelsToPrune: 3, + expectedQ: []*channel{}, + expectedCurrentChannel: nil, + }, + { + name: "[A,B,C]+2->[C]", + initialQ: []*channel{A, B, C}, + numChannelsToPrune: 2, + expectedQ: []*channel{C}, + }, + { + name: "[A,B,C]+3->[]", + initialQ: []*channel{A, B, C}, + numChannelsToPrune: 3, + expectedQ: []*channel{}, + }, + { + name: "[A,B,C]+4->panic", + initialQ: []*channel{A, B, C}, + numChannelsToPrune: 4, + expectedQ: nil, // declare that the prune method should panic + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + m.channelQueue = tc.initialQ + m.currentChannel = tc.initialCurrentChannel + if tc.expectedQ != nil { + m.pruneChannels(tc.numChannelsToPrune) + require.Equal(t, tc.expectedQ, m.channelQueue) + require.Equal(t, tc.expectedCurrentChannel, m.currentChannel) + } else { + require.Panics(t, func() { m.pruneChannels(tc.numChannelsToPrune) }) + } + }) + } } + func TestChannelManager_ChannelOutFactory(t *testing.T) { type ChannelOutWrapper struct { derive.ChannelOut diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index d52a31bba60b..5ac0579fb82e 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -114,7 +114,8 @@ type BatchSubmitter struct { txpoolState TxPoolState txpoolBlockedBlob bool - state *channelManager + state *channelManager + prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus } // NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup @@ -241,28 +242,15 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { return nil } -// loadBlocksIntoState loads all blocks since the previous stored block -// It does the following: -// 1. Fetch the sync status of the sequencer -// 2. Check if the sync status is valid or if we are all the way up to date -// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) -// 4. Load all new blocks into the local state. -// 5. Dequeue blocks from local state which are now safe. -// -// If there is a reorg, it will reset the last stored block but not clear the internal state so -// the state can be flushed to L1. -func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx context.Context) error { - start, end, err := l.calculateL2BlockRangeToStore(syncStatus) - if err != nil { - l.Log.Warn("Error calculating L2 block range", "err", err) - return err - } else if start.Number >= end.Number { - return errors.New("start number is >= end number") +// loadBlocksIntoState loads the blocks between start and end (inclusive). +// If there is a reorg, it will return an error. +func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uint64) error { + if end <= start { + return fmt.Errorf("start number is >= end number %d,%d", start, end) } - var latestBlock *types.Block // Add all blocks to "state" - for i := start.Number + 1; i < end.Number+1; i++ { + for i := start; i <= end; i++ { block, err := l.loadBlockIntoState(ctx, i) if errors.Is(err, ErrReorg) { l.Log.Warn("Found L2 reorg", "block_number", i) @@ -358,34 +346,6 @@ func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, er return syncStatus, nil } -// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. -func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) { - if syncStatus.HeadL1 == (eth.L1BlockRef{}) { - return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status") - } - // Check if we should even attempt to load any blocks. TODO: May not need this check - if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number { - return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) >= L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number) - } - - lastStoredBlock := l.state.LastStoredBlock() - start := lastStoredBlock - end := syncStatus.UnsafeL2.ID() - - // Check last stored block to see if it is empty or has lagged behind. - // It lagging implies that the op-node processed some batches that - // were submitted prior to the current instance of the batcher being alive. - if lastStoredBlock == (eth.BlockID{}) { - l.Log.Info("Resuming batch-submitter work at safe-head", "safe", syncStatus.SafeL2) - start = syncStatus.SafeL2.ID() - } else if lastStoredBlock.Number < syncStatus.SafeL2.Number { - l.Log.Warn("Last stored block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", lastStoredBlock, "safe", syncStatus.SafeL2) - start = syncStatus.SafeL2.ID() - } - - return start, end, nil -} - // The following things occur: // New L2 block (reorg or not) // L1 transaction is confirmed @@ -464,20 +424,34 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR continue } - l.state.pruneSafeBlocks(syncStatus.SafeL2) - l.state.pruneChannels(syncStatus.SafeL2) + // Decide appropriate actions + syncActions, outOfSync := computeSyncActions(*syncStatus, l.prevCurrentL1, l.state.blocks, l.state.channelQueue, l.Log) - err = l.state.CheckExpectedProgress(*syncStatus) - if err != nil { - l.Log.Warn("error checking expected progress, clearing state and waiting for node sync", "err", err) - l.waitNodeSyncAndClearState() + if outOfSync { + // If the sequencer is out of sync + // do nothing and wait to see if it has + // got in sync on the next tick. + l.Log.Warn("Sequencer is out of sync, retrying next tick.") continue } - if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) { - l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) - l.waitNodeSyncAndClearState() - continue + l.prevCurrentL1 = syncStatus.CurrentL1 + + // Manage existing state / garbage collection + if syncActions.clearState != nil { + l.state.Clear(*syncActions.clearState) + } else { + l.state.pruneSafeBlocks(syncActions.blocksToPrune) + l.state.pruneChannels(syncActions.channelsToPrune) + } + + if syncActions.blocksToLoad != nil { + // Get fresh unsafe blocks + if err := l.loadBlocksIntoState(l.shutdownCtx, syncActions.blocksToLoad.start, syncActions.blocksToLoad.end); errors.Is(err, ErrReorg) { + l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) + l.waitNodeSyncAndClearState() + continue + } } l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) diff --git a/op-batcher/batcher/sync_actions.go b/op-batcher/batcher/sync_actions.go new file mode 100644 index 000000000000..8f4121d7d5f4 --- /dev/null +++ b/op-batcher/batcher/sync_actions.go @@ -0,0 +1,143 @@ +package batcher + +import ( + "fmt" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/queue" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +type channelStatuser interface { + isFullySubmitted() bool + isTimedOut() bool + LatestL2() eth.BlockID + MaxInclusionBlock() uint64 +} + +type inclusiveBlockRange struct{ start, end uint64 } +type syncActions struct { + clearState *eth.BlockID + blocksToPrune int + channelsToPrune int + blocksToLoad *inclusiveBlockRange // the blocks that should be loaded into the local state. + // NOTE this range is inclusive on both ends, which is a change to previous behaviour. +} + +func (s syncActions) String() string { + return fmt.Sprintf( + "SyncActions{blocksToPrune: %d, channelsToPrune: %d, clearState: %v, blocksToLoad: %v}", s.blocksToPrune, s.channelsToPrune, s.clearState, s.blocksToLoad) +} + +// computeSyncActions determines the actions that should be taken based on the inputs provided. The inputs are the current +// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned +// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block +// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync. +func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) { + + // PART 1: Initial checks on the sync status + if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) { + l.Warn("empty sync status") + return syncActions{}, true + } + + if newSyncStatus.CurrentL1.Number < prevCurrentL1.Number { + // This can happen when the sequencer restarts + l.Warn("sequencer currentL1 reversed") + return syncActions{}, true + } + + // PART 2: checks involving only the oldest block in the state + oldestBlockInState, hasBlocks := blocks.Peek() + oldestUnsafeBlockNum := newSyncStatus.SafeL2.Number + 1 + youngestUnsafeBlockNum := newSyncStatus.UnsafeL2.Number + + if !hasBlocks { + s := syncActions{ + blocksToLoad: &inclusiveBlockRange{oldestUnsafeBlockNum, youngestUnsafeBlockNum}, + } + l.Info("no blocks in state", "syncActions", s) + return s, false + } + + // These actions apply in multiple unhappy scenarios below, where + // we detect that the existing state is invalidated + // and we need to start over from the sequencer's oldest + // unsafe (and not safe) block. + startAfresh := syncActions{ + clearState: &newSyncStatus.SafeL2.L1Origin, + blocksToLoad: &inclusiveBlockRange{oldestUnsafeBlockNum, youngestUnsafeBlockNum}, + } + + oldestBlockInStateNum := oldestBlockInState.NumberU64() + + if oldestUnsafeBlockNum < oldestBlockInStateNum { + l.Warn("oldest unsafe block is below oldest block in state", "syncActions", startAfresh, "oldestBlockInState", oldestBlockInState, "newSafeBlock", newSyncStatus.SafeL2) + return startAfresh, false + } + + // PART 3: checks involving all blocks in state + newestBlockInState := blocks[blocks.Len()-1] + newestBlockInStateNum := newestBlockInState.NumberU64() + + numBlocksToDequeue := oldestUnsafeBlockNum - oldestBlockInStateNum + + if numBlocksToDequeue > uint64(blocks.Len()) { + // This could happen if the batcher restarted. + // The sequencer may have derived the safe chain + // from channels sent by a previous batcher instance. + l.Warn("oldest unsafe block above newest block in state, clearing channel manager state", + "oldestUnsafeBlockNum", oldestUnsafeBlockNum, + "newestBlockInState", eth.ToBlockID(newestBlockInState), + "syncActions", + startAfresh) + return startAfresh, false + } + + if numBlocksToDequeue > 0 && blocks[numBlocksToDequeue-1].Hash() != newSyncStatus.SafeL2.Hash { + l.Warn("safe chain reorg, clearing channel manager state", + "existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]), + "newSafeBlock", newSyncStatus.SafeL2, + "syncActions", startAfresh) + return startAfresh, false + } + + // PART 4: checks involving channels + for _, ch := range channels { + if ch.isFullySubmitted() && + !ch.isTimedOut() && + newSyncStatus.CurrentL1.Number > ch.MaxInclusionBlock() && + newSyncStatus.SafeL2.Number < ch.LatestL2().Number { + // Safe head did not make the expected progress + // for a fully submitted channel. This indicates + // that the derivation pipeline may have stalled + // e.g. because of Holocene strict ordering rules. + l.Warn("sequencer did not make expected progress", + "existingBlock", eth.ToBlockID(blocks[numBlocksToDequeue-1]), + "newSafeBlock", newSyncStatus.SafeL2, + "syncActions", startAfresh) + return startAfresh, false + } + } + + // PART 5: happy path + numChannelsToPrune := 0 + for _, ch := range channels { + if ch.LatestL2().Number > newSyncStatus.SafeL2.Number { + // If the channel has blocks which are not yet safe + // we do not want to prune it. + break + } + numChannelsToPrune++ + } + + start := newestBlockInStateNum + 1 + end := youngestUnsafeBlockNum + + return syncActions{ + blocksToPrune: int(numBlocksToDequeue), + channelsToPrune: numChannelsToPrune, + blocksToLoad: &inclusiveBlockRange{start, end}, + }, false +} diff --git a/op-batcher/batcher/sync_actions_test.go b/op-batcher/batcher/sync_actions_test.go new file mode 100644 index 000000000000..f48ed9dabfbe --- /dev/null +++ b/op-batcher/batcher/sync_actions_test.go @@ -0,0 +1,248 @@ +package batcher + +import ( + "math/big" + "testing" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/queue" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +type testChannelStatuser struct { + latestL2 eth.BlockID + inclusionBlock uint64 + fullySubmitted, timedOut bool +} + +func (tcs testChannelStatuser) LatestL2() eth.BlockID { + return tcs.latestL2 +} + +func (tcs testChannelStatuser) MaxInclusionBlock() uint64 { + return tcs.inclusionBlock +} +func (tcs testChannelStatuser) isFullySubmitted() bool { + return tcs.fullySubmitted +} + +func (tcs testChannelStatuser) isTimedOut() bool { + return tcs.timedOut +} + +func TestBatchSubmitter_computeSyncActions(t *testing.T) { + + block101 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(101)}) + block102 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(102)}) + block103 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(103)}) + + channel103 := testChannelStatuser{ + latestL2: eth.ToBlockID(block103), + inclusionBlock: 1, + fullySubmitted: true, + timedOut: false, + } + + block104 := types.NewBlockWithHeader(&types.Header{Number: big.NewInt(104)}) + + channel104 := testChannelStatuser{ + latestL2: eth.ToBlockID(block104), + inclusionBlock: 1, + fullySubmitted: false, + timedOut: false, + } + + type TestCase struct { + name string + // inputs + newSyncStatus eth.SyncStatus + prevCurrentL1 eth.L1BlockRef + blocks queue.Queue[*types.Block] + channels []channelStatuser + // expectations + expected syncActions + expectedSeqOutOfSync bool + expectedLogs []string + } + + testCases := []TestCase{ + {name: "empty sync status", + // This can happen when the sequencer recovers from a reorg + newSyncStatus: eth.SyncStatus{}, + expected: syncActions{}, + expectedSeqOutOfSync: true, + expectedLogs: []string{"empty sync status"}, + }, + {name: "current l1 reversed", + // This can happen when the sequencer restarts or is switched + // to a backup sequencer: + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 2}, + CurrentL1: eth.BlockRef{Number: 1}, + }, + prevCurrentL1: eth.BlockRef{Number: 2}, + expected: syncActions{}, + expectedSeqOutOfSync: true, + expectedLogs: []string{"sequencer currentL1 reversed"}, + }, + {name: "gap between safe chain and state", + // This can happen if there is an L1 reorg: + // although the sequencer has derived up the same + // L1 block height, it derived fewer safe L2 blocks. + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 6}, + CurrentL1: eth.BlockRef{Number: 1}, + SafeL2: eth.L2BlockRef{Number: 100, L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block102, block103}, // note absence of block101 + channels: []channelStatuser{channel103}, + expected: syncActions{ + clearState: ð.BlockID{Number: 1}, + blocksToLoad: &inclusiveBlockRange{101, 109}, + }, + expectedLogs: []string{"oldest unsafe block is below oldest block in state"}, + }, + {name: "unexpectedly good progress", + // This can happen if another batcher instance got some blocks + // included in the safe chain: + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 6}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 104, L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103}, + channels: []channelStatuser{channel103}, + expected: syncActions{ + clearState: ð.BlockID{Number: 1}, + blocksToLoad: &inclusiveBlockRange{105, 109}, + }, + expectedLogs: []string{"oldest unsafe block above newest block in state"}, + }, + {name: "safe chain reorg", + // This can happen if there is an L1 reorg, the safe chain is at an acceptable + // height but it does not descend from the blocks in state: + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, // note hash mismatch + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103}, + channels: []channelStatuser{channel103}, + expected: syncActions{ + clearState: ð.BlockID{Number: 1}, + blocksToLoad: &inclusiveBlockRange{104, 109}, + }, + expectedLogs: []string{"safe chain reorg"}, + }, + {name: "failed to make expected progress", + // This could happen if the batcher unexpectedly violates the + // Holocene derivation rules: + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 3}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 101, Hash: block101.Hash(), L1Origin: eth.BlockID{Number: 1}}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103}, + channels: []channelStatuser{channel103}, + expected: syncActions{ + clearState: ð.BlockID{Number: 1}, + blocksToLoad: &inclusiveBlockRange{102, 109}, + }, + expectedLogs: []string{"sequencer did not make expected progress"}, + }, + {name: "no progress", + // This can happen if we have a long channel duration + // and we didn't submit or have any txs confirmed since + // the last sync. + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 4}, + CurrentL1: eth.BlockRef{Number: 1}, + SafeL2: eth.L2BlockRef{Number: 100}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103}, + channels: []channelStatuser{channel103}, + expected: syncActions{ + blocksToLoad: &inclusiveBlockRange{104, 109}, + }, + }, + {name: "no blocks", + // This happens when the batcher is starting up for the first time + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{}, + channels: []channelStatuser{}, + expected: syncActions{ + blocksToLoad: &inclusiveBlockRange{104, 109}, + }, + }, + {name: "happy path", + // This happens when the safe chain is being progressed as expected: + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103}, + channels: []channelStatuser{channel103}, + expected: syncActions{ + blocksToPrune: 3, + channelsToPrune: 1, + blocksToLoad: &inclusiveBlockRange{104, 109}, + }, + }, + {name: "happy path + multiple channels", + newSyncStatus: eth.SyncStatus{ + HeadL1: eth.BlockRef{Number: 5}, + CurrentL1: eth.BlockRef{Number: 2}, + SafeL2: eth.L2BlockRef{Number: 103, Hash: block103.Hash()}, + UnsafeL2: eth.L2BlockRef{Number: 109}, + }, + prevCurrentL1: eth.BlockRef{Number: 1}, + blocks: queue.Queue[*types.Block]{block101, block102, block103, block104}, + channels: []channelStatuser{channel103, channel104}, + expected: syncActions{ + blocksToPrune: 3, + channelsToPrune: 1, + blocksToLoad: &inclusiveBlockRange{105, 109}, + }, + }, + } + + for _, tc := range testCases { + + t.Run(tc.name, func(t *testing.T) { + l, h := testlog.CaptureLogger(t, log.LevelDebug) + + result, outOfSync := computeSyncActions( + tc.newSyncStatus, tc.prevCurrentL1, tc.blocks, tc.channels, l, + ) + + require.Equal(t, tc.expected, result) + require.Equal(t, tc.expectedSeqOutOfSync, outOfSync) + for _, e := range tc.expectedLogs { + r := h.FindLog(testlog.NewMessageContainsFilter(e)) + require.NotNil(t, r, "could not find log message containing '%s'", e) + } + }) + } +} diff --git a/op-batcher/readme.md b/op-batcher/readme.md index ba547845f099..9e81ff978fec 100644 --- a/op-batcher/readme.md +++ b/op-batcher/readme.md @@ -32,11 +32,14 @@ The philosophy behind the current architecture is: ### Happy path In the happy path, the batcher periodically: +0. Queries the sequencer's syncStatus and + a. (optionally) waits for it to ingest more L1 data before taking action + b. prunes blocks and channels from its internal state which are no longer required 1. Enqueues unsafe blocks and dequeues safe blocks from the sequencer to its internal state. 2. Enqueues a new channel, if necessary. 3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames. 4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions). -5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 1. +5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 0. The `blockCursor` state variable tracks the next unprocessed block. @@ -57,7 +60,6 @@ At the current time, the batcher should be optimized for correctness, simplicity The batcher can almost always recover from unforeseen situations by being restarted. - Some complexity is permitted, however, for handling data availability switching, so that the batcher is not wasting money for longer periods of time. ### Data Availability Backlog @@ -79,6 +81,9 @@ transaction. But in the case of a DA backlog (as defined by OP_BATCHER_THROTTLE_ block builder to instead impose a (tighter) block level limit of OP_BATCHER_THROTTLE_BLOCK_SIZE, and a single transaction limit of OP_BATCHER_THROTTLE_TRANSACTION_SIZE. +### Max Channel Duration +The batcher tries to ensure that batches are posted at a minimum frequency specified by `MAX_CHANNEL_DURATION`. To achiveve this, it caches the l1 origin of the last submitted channel, and will force close a channel if the timestamp of the l1 head moves beyond the timestamp of that l1 origin plus `MAX_CHANNEL_DURATION`. When clearing its state, e.g. following the detection of a reorg, the batcher will not clear the cached l1 origin: this way, the regular posting of batches will not be disturbed by events like reorgs. + ## Known issues and future work Link to [open issues with the `op-batcher` tag](https://github.com/ethereum-optimism/optimism/issues?q=is%3Aopen+is%3Aissue+label%3AA-op-batcher).