diff --git a/op-node/rollup/derive/batch_queue.go b/op-node/rollup/derive/batch_queue.go index 4ca49907c7de9..cc2ae96816ec9 100644 --- a/op-node/rollup/derive/batch_queue.go +++ b/op-node/rollup/derive/batch_queue.go @@ -27,7 +27,12 @@ import ( // It is internally responsible for making sure that batches with L1 inclusions block outside it's // working range are not considered or pruned. +type ChannelFlusher interface { + FlushChannel() +} + type NextBatchProvider interface { + ChannelFlusher Origin() eth.L1BlockRef NextBatch(ctx context.Context) (Batch, error) } @@ -37,9 +42,10 @@ type SafeBlockFetcher interface { PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error) } -// BatchQueue contains a set of batches for every L1 block. -// L1 blocks are contiguous and this does not support reorgs. -type BatchQueue struct { +// The baseBatchStage is a shared implementation of basic channel stage functionality. It is +// currently shared between the legacy BatchQueue, which buffers future batches, and the +// post-Holocene BatchStage, which requires strictly ordered batches. +type baseBatchStage struct { log log.Logger config *rollup.Config prev NextBatchProvider @@ -53,18 +59,14 @@ type BatchQueue struct { // length of l1Blocks never exceeds SequencerWindowSize l1Blocks []eth.L1BlockRef - // batches in order of when we've first seen them - batches []*BatchWithL1InclusionBlock - // nextSpan is cached SingularBatches derived from SpanBatch nextSpan []*SingularBatch l2 SafeBlockFetcher } -// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. -func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue { - return &BatchQueue{ +func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage { + return baseBatchStage{ log: log, config: cfg, prev: prev, @@ -72,80 +74,137 @@ func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l } } -func (bq *BatchQueue) Origin() eth.L1BlockRef { - return bq.prev.Origin() +func (bs *baseBatchStage) base() *baseBatchStage { + return bs +} + +func (bs *baseBatchStage) Log() log.Logger { + if len(bs.l1Blocks) == 0 { + return bs.log.New("origin", bs.origin.ID()) + } else { + return bs.log.New("origin", bs.origin.ID(), "epoch", bs.l1Blocks[0]) + } +} + +type SingularBatchProvider interface { + ResettableStage + NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error) +} + +// BatchQueue contains a set of batches for every L1 block. +// L1 blocks are contiguous and this does not support reorgs. +type BatchQueue struct { + baseBatchStage + + // batches in order of when we've first seen them + batches []*BatchWithL1InclusionBlock +} + +var _ SingularBatchProvider = (*BatchQueue)(nil) + +// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use. +func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue { + return &BatchQueue{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)} +} + +func (bs *baseBatchStage) Origin() eth.L1BlockRef { + return bs.prev.Origin() } // popNextBatch pops the next batch from the current queued up span-batch nextSpan. // The queue must be non-empty, or the function will panic. -func (bq *BatchQueue) popNextBatch(parent eth.L2BlockRef) *SingularBatch { - if len(bq.nextSpan) == 0 { +func (bs *baseBatchStage) popNextBatch(parent eth.L2BlockRef) *SingularBatch { + if len(bs.nextSpan) == 0 { panic("popping non-existent span-batch, invalid state") } - nextBatch := bq.nextSpan[0] - bq.nextSpan = bq.nextSpan[1:] + nextBatch := bs.nextSpan[0] + bs.nextSpan = bs.nextSpan[1:] // Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch(). nextBatch.ParentHash = parent.Hash - bq.log.Debug("pop next batch from the cached span batch") + bs.log.Debug("pop next batch from the cached span batch") return nextBatch } // NextBatch return next valid batch upon the given safe head. // It also returns the boolean that indicates if the batch is the last block in the batch. -func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) { - if len(bq.nextSpan) > 0 { +func (bs *baseBatchStage) nextFromSpanBatch(parent eth.L2BlockRef) (*SingularBatch, bool) { + if len(bs.nextSpan) > 0 { // There are cached singular batches derived from the span batch. // Check if the next cached batch matches the given parent block. - if bq.nextSpan[0].Timestamp == parent.Time+bq.config.BlockTime { + if bs.nextSpan[0].Timestamp == parent.Time+bs.config.BlockTime { // Pop first one and return. - nextBatch := bq.popNextBatch(parent) + nextBatch := bs.popNextBatch(parent) // len(bq.nextSpan) == 0 means it's the last batch of the span. - return nextBatch, len(bq.nextSpan) == 0, nil + return nextBatch, len(bs.nextSpan) == 0 } else { // Given parent block does not match the next batch. It means the previously returned batch is invalid. // Drop cached batches and find another batch. - bq.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bq.nextSpan[0].GetTimestamp()) - bq.nextSpan = bq.nextSpan[:0] + bs.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bs.nextSpan[0].GetTimestamp()) + bs.nextSpan = bs.nextSpan[:0] } } + return nil, false +} +func (bs *baseBatchStage) updateOrigins(parent eth.L2BlockRef) { // Note: We use the origin that we will have to determine if it's behind. This is important // because it's the future origin that gets saved into the l1Blocks array. // We always update the origin of this stage if it is not the same so after the update code // runs, this is consistent. - originBehind := bq.prev.Origin().Number < parent.L1Origin.Number + originBehind := bs.originBehind(parent) // Advance origin if needed // Note: The entire pipeline has the same origin // We just don't accept batches prior to the L1 origin of the L2 safe head - if bq.origin != bq.prev.Origin() { - bq.origin = bq.prev.Origin() + if bs.origin != bs.prev.Origin() { + bs.origin = bs.prev.Origin() if !originBehind { - bq.l1Blocks = append(bq.l1Blocks, bq.origin) + bs.l1Blocks = append(bs.l1Blocks, bs.origin) } else { // This is to handle the special case of startup. At startup we call Reset & include // the L1 origin. That is the only time where immediately after `Reset` is called // originBehind is false. - bq.l1Blocks = bq.l1Blocks[:0] + bs.l1Blocks = bs.l1Blocks[:0] } - bq.log.Info("Advancing bq origin", "origin", bq.origin, "originBehind", originBehind) + bs.log.Info("Advancing bq origin", "origin", bs.origin, "originBehind", originBehind) } // If the epoch is advanced, update bq.l1Blocks - // Advancing epoch must be done after the pipeline successfully apply the entire span batch to the chain. - // Because the span batch can be reverted during processing the batch, then we must preserve existing l1Blocks - // to verify the epochs of the next candidate batch. - if len(bq.l1Blocks) > 0 && parent.L1Origin.Number > bq.l1Blocks[0].Number { - for i, l1Block := range bq.l1Blocks { + // Before Holocene, advancing the epoch must be done after the pipeline successfully applied the entire span batch to the chain. + // This is because the entire span batch can be reverted after finding an invalid batch. + // So we must preserve the existing l1Blocks to verify the epochs of the next candidate batch. + if len(bs.l1Blocks) > 0 && parent.L1Origin.Number > bs.l1Blocks[0].Number { + for i, l1Block := range bs.l1Blocks { if parent.L1Origin.Number == l1Block.Number { - bq.l1Blocks = bq.l1Blocks[i:] - bq.log.Debug("Advancing internal L1 blocks", "next_epoch", bq.l1Blocks[0].ID(), "next_epoch_time", bq.l1Blocks[0].Time) + if bs.config.IsHolocene(bs.origin.Time) && i > 1 { + // TODO(12444): We'll see if this invariant really holds. It should if the + // origin only ever increases in single increments, which I currently assume is + // the case for Holocene. This being true is not a strict requirement from + // Holocene though. This check should be removed after successful testing. + panic("updateOrigins: unexpected origin jump") + } + bs.l1Blocks = bs.l1Blocks[i:] + bs.log.Debug("Advancing internal L1 blocks", "next_epoch", bs.l1Blocks[0].ID(), "next_epoch_time", bs.l1Blocks[0].Time) break } } // If we can't find the origin of parent block, we have to advance bq.origin. } +} + +func (bs *baseBatchStage) originBehind(parent eth.L2BlockRef) bool { + return bs.prev.Origin().Number < parent.L1Origin.Number +} + +func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) { + // Early return if there are singular batches from a span batch queued up + if batch, last := bq.nextFromSpanBatch(parent); batch != nil { + return batch, last, nil + } + + bq.updateOrigins(parent) + originBehind := bq.originBehind(parent) // Load more data into the batch queue outOfData := false if batch, err := bq.prev.NextBatch(ctx); err == io.EOF { @@ -206,17 +265,21 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si return nextBatch, len(bq.nextSpan) == 0, nil } -func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error { +func (bs *baseBatchStage) reset(base eth.L1BlockRef) { // Copy over the Origin from the next stage // It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress - bq.origin = base - bq.batches = []*BatchWithL1InclusionBlock{} + bs.origin = base // Include the new origin as an origin to build on // Note: This is only for the initialization case. During normal resets we will later // throw out this block. - bq.l1Blocks = bq.l1Blocks[:0] - bq.l1Blocks = append(bq.l1Blocks, base) - bq.nextSpan = bq.nextSpan[:0] + bs.l1Blocks = bs.l1Blocks[:0] + bs.l1Blocks = append(bs.l1Blocks, base) + bs.nextSpan = bs.nextSpan[:0] +} + +func (bq *BatchQueue) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error { + bq.baseBatchStage.reset(base) + bq.batches = bq.batches[:0] return io.EOF } @@ -257,7 +320,6 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, paren // Find the first-seen batch that matches all validity conditions. // We may not have sufficient information to proceed filtering, and then we stop. // There may be none: in that case we force-create an empty batch - nextTimestamp := parent.Time + bq.config.BlockTime var nextBatch *BatchWithL1InclusionBlock // Go over all batches, in order of inclusion, and find the first batch we can accept. @@ -296,33 +358,39 @@ batchLoop: nextBatch.Batch.LogContext(bq.log).Info("Found next batch") return nextBatch.Batch, nil } + return bq.deriveNextEmptyBatch(ctx, outOfData, parent) +} +// deriveNextEmptyBatch may derive an empty batch if the sequencing window is expired +func (bs *baseBatchStage) deriveNextEmptyBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (*SingularBatch, error) { + epoch := bs.l1Blocks[0] // If the current epoch is too old compared to the L1 block we are at, // i.e. if the sequence window expired, we create empty batches for the current epoch - expiryEpoch := epoch.Number + bq.config.SeqWindowSize - forceEmptyBatches := (expiryEpoch == bq.origin.Number && outOfData) || expiryEpoch < bq.origin.Number + expiryEpoch := epoch.Number + bs.config.SeqWindowSize + forceEmptyBatches := (expiryEpoch == bs.origin.Number && outOfData) || expiryEpoch < bs.origin.Number firstOfEpoch := epoch.Number == parent.L1Origin.Number+1 + nextTimestamp := parent.Time + bs.config.BlockTime - bq.log.Trace("Potentially generating an empty batch", + bs.log.Trace("Potentially generating an empty batch", "expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp, - "epoch_time", epoch.Time, "len_l1_blocks", len(bq.l1Blocks), "firstOfEpoch", firstOfEpoch) + "epoch_time", epoch.Time, "len_l1_blocks", len(bs.l1Blocks), "firstOfEpoch", firstOfEpoch) if !forceEmptyBatches { // sequence window did not expire yet, still room to receive batches for the current epoch, // no need to force-create empty batch(es) towards the next epoch yet. return nil, io.EOF } - if len(bq.l1Blocks) < 2 { + if len(bs.l1Blocks) < 2 { // need next L1 block to proceed towards return nil, io.EOF } - nextEpoch := bq.l1Blocks[1] + nextEpoch := bs.l1Blocks[1] // Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin, // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a // batch to ensure that we at least have one batch per epoch. if nextTimestamp < nextEpoch.Time || firstOfEpoch { - bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp) + bs.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp) return &SingularBatch{ ParentHash: parent.Hash, EpochNum: rollup.Epoch(epoch.Number), @@ -334,7 +402,9 @@ batchLoop: // At this point we have auto generated every batch for the current epoch // that we can, so we can advance to the next epoch. - bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time) - bq.l1Blocks = bq.l1Blocks[1:] + // TODO(12444): Instead of manually advancing the epoch here, it may be better to generate a + // batch for the next epoch, so that updateOrigins then properly advances the origin. + bs.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time) + bs.l1Blocks = bs.l1Blocks[1:] return nil, io.EOF } diff --git a/op-node/rollup/derive/batch_queue_test.go b/op-node/rollup/derive/batch_queue_test.go index f047f0a7d4fef..54ae9abc63703 100644 --- a/op-node/rollup/derive/batch_queue_test.go +++ b/op-node/rollup/derive/batch_queue_test.go @@ -32,6 +32,12 @@ func (f *fakeBatchQueueInput) Origin() eth.L1BlockRef { return f.origin } +func (f *fakeBatchQueueInput) FlushChannel() { + f.batches = nil + f.errors = nil + f.i = 0 +} + func (f *fakeBatchQueueInput) NextBatch(ctx context.Context) (Batch, error) { if f.i >= len(f.batches) { return nil, io.EOF @@ -141,33 +147,66 @@ func TestBatchQueue(t *testing.T) { name string f func(t *testing.T, batchType int) }{ - {"BatchQueueNewOrigin", BatchQueueNewOrigin}, - {"BatchQueueEager", BatchQueueEager}, - {"BatchQueueInvalidInternalAdvance", BatchQueueInvalidInternalAdvance}, - {"BatchQueueMissing", BatchQueueMissing}, - {"BatchQueueAdvancedEpoch", BatchQueueAdvancedEpoch}, - {"BatchQueueShuffle", BatchQueueShuffle}, - {"BatchQueueResetOneBlockBeforeOrigin", BatchQueueResetOneBlockBeforeOrigin}, + {"Missing", testBatchQueue_Missing}, + {"Shuffle", testBatchQueue_Shuffle}, } for _, test := range tests { test := test t.Run(test.name+"_SingularBatch", func(t *testing.T) { test.f(t, SingularBatchType) }) + t.Run(test.name+"_SpanBatch", func(t *testing.T) { + test.f(t, SpanBatchType) + }) } +} +type testableBatchStageFactory func(log.Logger, *rollup.Config, NextBatchProvider, SafeBlockFetcher) testableBatchStage + +type testableBatchStage interface { + SingularBatchProvider + base() *baseBatchStage +} + +func TestBatchStages(t *testing.T) { + newBatchQueue := func(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) testableBatchStage { + return NewBatchQueue(log, cfg, prev, l2) + } + newBatchStage := func(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) testableBatchStage { + return NewBatchStage(log, cfg, prev, l2) + } + + tests := []struct { + name string + f func(*testing.T, int, testableBatchStageFactory) + }{ + {"NewOrigin", testBatchStage_NewOrigin}, + {"Eager", testBatchStage_Eager}, + {"InvalidInternalAdvance", testBatchStage_InvalidInternalAdvance}, + {"AdvancedEpoch", testBatchStage_AdvancedEpoch}, + {"ResetOneBlockBeforeOrigin", testBatchStage_ResetOneBlockBeforeOrigin}, + } for _, test := range tests { test := test - t.Run(test.name+"_SpanBatch", func(t *testing.T) { - test.f(t, SpanBatchType) + t.Run("BatchQueue_"+test.name+"_SingularBatch", func(t *testing.T) { + test.f(t, SingularBatchType, newBatchQueue) + }) + t.Run("BatchQueue_"+test.name+"_SpanBatch", func(t *testing.T) { + test.f(t, SpanBatchType, newBatchQueue) + }) + t.Run("BatchStage_"+test.name+"_SingularBatch", func(t *testing.T) { + test.f(t, SingularBatchType, newBatchStage) + }) + t.Run("BatchStage_"+test.name+"_SpanBatch", func(t *testing.T) { + test.f(t, SpanBatchType, newBatchStage) }) } } -// BatchQueueNewOrigin tests that the batch queue properly saves the new origin +// testBatchStage_NewOrigin tests that the batch queue properly saves the new origin // when the safehead's origin is ahead of the pipeline's origin (as is after a reset). // This issue was fixed in https://github.com/ethereum-optimism/optimism/pull/3694 -func BatchQueueNewOrigin(t *testing.T, batchType int) { +func testBatchStage_NewOrigin(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) { log := testlog.Logger(t, log.LevelCrit) l1 := L1Chain([]uint64{10, 15, 20, 25}) safeHead := eth.L2BlockRef{ @@ -194,17 +233,18 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { origin: l1[0], } - bq := NewBatchQueue(log, cfg, input, nil) + bq := newBatchStage(log, cfg, input, nil) + bqb := bq.base() _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) - require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) + require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks) // Prev Origin: 0; Safehead Origin: 2; Internal Origin: 0 // Should return no data but keep the same origin data, _, err := bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equal(t, io.EOF, err) - require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) - require.Equal(t, l1[0], bq.origin) + require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks) + require.Equal(t, l1[0], bqb.origin) // Prev Origin: 1; Safehead Origin: 2; Internal Origin: 0 // Should wipe l1blocks + advance internal origin @@ -212,8 +252,8 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { data, _, err = bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equal(t, io.EOF, err) - require.Empty(t, bq.l1Blocks) - require.Equal(t, l1[1], bq.origin) + require.Empty(t, bqb.l1Blocks) + require.Equal(t, l1[1], bqb.origin) // Prev Origin: 2; Safehead Origin: 2; Internal Origin: 1 // Should add to l1Blocks + advance internal origin @@ -221,14 +261,14 @@ func BatchQueueNewOrigin(t *testing.T, batchType int) { data, _, err = bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equal(t, io.EOF, err) - require.Equal(t, []eth.L1BlockRef{l1[2]}, bq.l1Blocks) - require.Equal(t, l1[2], bq.origin) + require.Equal(t, []eth.L1BlockRef{l1[2]}, bqb.l1Blocks) + require.Equal(t, l1[2], bqb.origin) } -// BatchQueueResetOneBlockBeforeOrigin tests that the batch queue properly +// testBatchStage_ResetOneBlockBeforeOrigin tests that the batch queue properly // prunes the l1Block recorded as part of a reset when the starting origin // is exactly one block prior to the safe head origin. -func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { +func testBatchStage_ResetOneBlockBeforeOrigin(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) { log := testlog.Logger(t, log.LevelTrace) l1 := L1Chain([]uint64{10, 15, 20, 25}) safeHead := eth.L2BlockRef{ @@ -255,17 +295,18 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { origin: l1[0], } - bq := NewBatchQueue(log, cfg, input, nil) + bq := newBatchStage(log, cfg, input, nil) + bqb := bq.base() _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) - require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) + require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks) // Prev Origin: 0; Safehead Origin: 1; Internal Origin: 0 // Should return no data but keep the same origin data, _, err := bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equal(t, io.EOF, err) - require.Equal(t, []eth.L1BlockRef{l1[0]}, bq.l1Blocks) - require.Equal(t, l1[0], bq.origin) + require.Equal(t, []eth.L1BlockRef{l1[0]}, bqb.l1Blocks) + require.Equal(t, l1[0], bqb.origin) // Prev Origin: 1; Safehead Origin: 1; Internal Origin: 0 // Should record new l1 origin in l1blocks, prune block 0 and advance internal origin @@ -273,8 +314,8 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { data, _, err = bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equalf(t, io.EOF, err, "expected io.EOF but got %v", err) - require.Equal(t, []eth.L1BlockRef{l1[1]}, bq.l1Blocks) - require.Equal(t, l1[1], bq.origin) + require.Equal(t, []eth.L1BlockRef{l1[1]}, bqb.l1Blocks) + require.Equal(t, l1[1], bqb.origin) // Prev Origin: 2; Safehead Origin: 1; Internal Origin: 1 // Should add to l1Blocks + advance internal origin @@ -282,13 +323,13 @@ func BatchQueueResetOneBlockBeforeOrigin(t *testing.T, batchType int) { data, _, err = bq.NextBatch(context.Background(), safeHead) require.Nil(t, data) require.Equal(t, io.EOF, err) - require.Equal(t, []eth.L1BlockRef{l1[1], l1[2]}, bq.l1Blocks) - require.Equal(t, l1[2], bq.origin) + require.Equal(t, []eth.L1BlockRef{l1[1], l1[2]}, bqb.l1Blocks) + require.Equal(t, l1[2], bqb.origin) } -// BatchQueueEager adds a bunch of contiguous batches and asserts that +// testBatchStage_Eager adds a bunch of contiguous batches and asserts that // enough calls to `NextBatch` return all of those batches. -func BatchQueueEager(t *testing.T, batchType int) { +func testBatchStage_Eager(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) { log := testlog.Logger(t, log.LevelCrit) l1 := L1Chain([]uint64{10, 20, 30}) chainId := big.NewInt(1234) @@ -344,7 +385,7 @@ func BatchQueueEager(t *testing.T, batchType int) { origin: l1[0], } - bq := NewBatchQueue(log, cfg, input, nil) + bq := newBatchStage(log, cfg, input, nil) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) // Advance the origin input.origin = l1[1] @@ -364,11 +405,11 @@ func BatchQueueEager(t *testing.T, batchType int) { } } -// BatchQueueInvalidInternalAdvance asserts that we do not miss an epoch when generating batches. +// testBatchStage_InvalidInternalAdvance asserts that we do not miss an epoch when generating batches. // This is a regression test for CLI-3378. -func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { +func testBatchStage_InvalidInternalAdvance(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) { log := testlog.Logger(t, log.LevelTrace) - l1 := L1Chain([]uint64{10, 15, 20, 25, 30}) + l1 := L1Chain([]uint64{5, 10, 15, 20, 25, 30}) chainId := big.NewInt(1234) safeHead := eth.L2BlockRef{ Hash: mockHash(10, 2), @@ -416,17 +457,26 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { } } + // prepend a nil batch so we can load the safe head's epoch input := &fakeBatchQueueInput{ - batches: inputBatches, - errors: inputErrors, + batches: append([]Batch{nil}, inputBatches...), + errors: append([]error{io.EOF}, inputErrors...), origin: l1[0], } - bq := NewBatchQueue(log, cfg, input, nil) + bq := newBatchStage(log, cfg, input, nil) _ = bq.Reset(context.Background(), l1[0], eth.SystemConfig{}) + // first load base epoch + b, _, e := bq.NextBatch(context.Background(), safeHead) + require.ErrorIs(t, e, io.EOF) + require.Nil(t, b) + // then advance to origin 1 with batches + input.origin = l1[1] + // Load continuous batches for epoch 0 for i := 0; i < len(expectedOutputBatches); i++ { + t.Logf("Iteration %d", i) b, _, e := bq.NextBatch(context.Background(), safeHead) require.ErrorIs(t, e, expectedOutputErrors[i]) if b == nil { @@ -440,14 +490,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { } } - // Advance to origin 1. No forced batches yet. - input.origin = l1[1] - b, _, e := bq.NextBatch(context.Background(), safeHead) - require.ErrorIs(t, e, io.EOF) - require.Nil(t, b) - - // Advance to origin 2. No forced batches yet because we are still on epoch 0 - // & have batches for epoch 0. + // Advance to origin 2. No forced batches yet. input.origin = l1[2] b, _, e = bq.NextBatch(context.Background(), safeHead) require.ErrorIs(t, e, io.EOF) @@ -456,7 +499,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { // Advance to origin 3. Should generate one empty batch. input.origin = l1[3] b, _, e = bq.NextBatch(context.Background(), safeHead) - require.Nil(t, e) + require.NoError(t, e) require.NotNil(t, b) require.Equal(t, safeHead.Time+2, b.Timestamp) require.Equal(t, rollup.Epoch(1), b.EpochNum) @@ -471,7 +514,7 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { // Advance to origin 4. Should generate one empty batch. input.origin = l1[4] b, _, e = bq.NextBatch(context.Background(), safeHead) - require.Nil(t, e) + require.NoError(t, e) require.NotNil(t, b) require.Equal(t, rollup.Epoch(2), b.EpochNum) require.Equal(t, safeHead.Time+2, b.Timestamp) @@ -482,10 +525,9 @@ func BatchQueueInvalidInternalAdvance(t *testing.T, batchType int) { b, _, e = bq.NextBatch(context.Background(), safeHead) require.ErrorIs(t, e, io.EOF) require.Nil(t, b) - } -func BatchQueueMissing(t *testing.T, batchType int) { +func testBatchQueue_Missing(t *testing.T, batchType int) { log := testlog.Logger(t, log.LevelCrit) l1 := L1Chain([]uint64{10, 15, 20, 25}) chainId := big.NewInt(1234) @@ -600,9 +642,9 @@ func BatchQueueMissing(t *testing.T, batchType int) { require.Equal(t, rollup.Epoch(1), b.EpochNum) } -// BatchQueueAdvancedEpoch tests that batch queue derives consecutive valid batches with advancing epochs. +// testBatchStage_AdvancedEpoch tests that batch queue derives consecutive valid batches with advancing epochs. // Batch queue's l1blocks list should be updated along epochs. -func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { +func testBatchStage_AdvancedEpoch(t *testing.T, batchType int, newBatchStage testableBatchStageFactory) { log := testlog.Logger(t, log.LevelCrit) l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s chainId := big.NewInt(1234) @@ -664,7 +706,7 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { origin: l1[inputOriginNumber], } - bq := NewBatchQueue(log, cfg, input, nil) + bq := newBatchStage(log, cfg, input, nil) _ = bq.Reset(context.Background(), l1[1], eth.SystemConfig{}) for i := 0; i < len(expectedOutputBatches); i++ { @@ -688,8 +730,8 @@ func BatchQueueAdvancedEpoch(t *testing.T, batchType int) { } } -// BatchQueueShuffle tests batch queue can reorder shuffled valid batches -func BatchQueueShuffle(t *testing.T, batchType int) { +// testBatchQueue_Shuffle tests batch queue can reorder shuffled valid batches +func testBatchQueue_Shuffle(t *testing.T, batchType int) { log := testlog.Logger(t, log.LevelCrit) l1 := L1Chain([]uint64{0, 6, 12, 18, 24}) // L1 block time: 6s chainId := big.NewInt(1234) diff --git a/op-node/rollup/derive/batch_stage.go b/op-node/rollup/derive/batch_stage.go new file mode 100644 index 0000000000000..f1df3c6723461 --- /dev/null +++ b/op-node/rollup/derive/batch_stage.go @@ -0,0 +1,167 @@ +package derive + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum/go-ethereum/log" +) + +type BatchStage struct { + baseBatchStage +} + +func NewBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchStage { + return &BatchStage{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)} +} + +func (bs *BatchStage) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error { + bs.baseBatchStage.reset(base) + return io.EOF +} + +func (bs *BatchStage) FlushChannel() { + bs.nextSpan = bs.nextSpan[:0] + bs.prev.FlushChannel() +} + +func (bs *BatchStage) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) { + // with Holocene, we can always update (and prune) the origins because we don't backwards-invalidate. + bs.updateOrigins(parent) + + // If origin behind (or at parent), we drain previous stage(s), and then return. + // Note that a channel from the parent's L1 origin block can only contain past batches, so we + // can just skip them. + // TODO(12444): we may be able to change the definition of originBehind to include equality, + // also for the pre-Holocene BatchQueue. This may also allow us to remove the edge case in + // updateOrigins. + if bs.originBehind(parent) || parent.L1Origin.Number == bs.origin.Number { + if _, err := bs.prev.NextBatch(ctx); err != nil { + // includes io.EOF and NotEnoughData + return nil, false, err + } + // continue draining + return nil, false, NotEnoughData + } + + if len(bs.l1Blocks) < 2 { + // This can only happen if derivation erroneously doesn't start at a safe head. + // By now, the L1 origin of the first safe head and the following L1 block must be in the + // l1Blocks. + return nil, false, NewCriticalError(fmt.Errorf( + "unexpected low buffered origin count, origin: %v, parent: %v", bs.origin, parent)) + } + + // Note: epoch origin can now be one block ahead of the L2 Safe Head + // This is in the case where we auto generate all batches in an epoch & advance the epoch in + // deriveNextEmptyBatch but don't advance the L2 Safe Head's epoch + if epoch := bs.l1Blocks[0]; parent.L1Origin != epoch.ID() && parent.L1Origin.Number != epoch.Number-1 { + return nil, false, NewResetError(fmt.Errorf("buffered L1 chain epoch %s in batch queue does not match safe head origin %s", epoch, parent.L1Origin)) + } + + batch, err := bs.nextSingularBatchCandidate(ctx, parent) + if err == io.EOF { + // We only consider empty batch generation after we've drained all batches from the local + // span batch queue and the previous stage. + empty, err := bs.deriveNextEmptyBatch(ctx, true, parent) + return empty, false, err + } else if err != nil { + return nil, false, err + } + + // check candidate validity + validity := checkSingularBatch(bs.config, bs.Log(), bs.l1Blocks, parent, batch, bs.origin) + switch validity { + case BatchAccept: // continue + batch.LogContext(bs.Log()).Debug("Found next singular batch") + return batch, len(bs.nextSpan) == 0, nil + case BatchPast: + batch.LogContext(bs.Log()).Warn("Dropping past singular batch") + // NotEnoughData to read in next batch until we're through all past batches + return nil, false, NotEnoughData + case BatchDrop: // drop, flush, move onto next channel + batch.LogContext(bs.Log()).Warn("Dropping invalid singular batch, flushing channel") + bs.FlushChannel() + // NotEnoughData will cause derivation from previous stages until they're empty, at which + // point empty batch derivation will happen. + return nil, false, NotEnoughData + case BatchUndecided: // l2 fetcher error, try again + batch.LogContext(bs.Log()).Warn("Undecided span batch") + return nil, false, NotEnoughData + case BatchFuture: // panic, can't happen + return nil, false, NewCriticalError(fmt.Errorf("impossible batch validity: %v", validity)) + default: + return nil, false, NewCriticalError(fmt.Errorf("unknown batch validity type: %d", validity)) + } +} + +func (bs *BatchStage) nextSingularBatchCandidate(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, error) { + // First check for next span-derived batch + nextBatch, _ := bs.nextFromSpanBatch(parent) + + if nextBatch != nil { + return nextBatch, nil + } + + // If the next batch is a singular batch, we forward it as the candidate. + // If it is a span batch, we check its validity and then forward its first singular batch. + batch, err := bs.prev.NextBatch(ctx) + if err != nil { // includes io.EOF + return nil, err + } + switch typ := batch.GetBatchType(); typ { + case SingularBatchType: + singularBatch, ok := batch.AsSingularBatch() + if !ok { + return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch")) + } + return singularBatch, nil + case SpanBatchType: + spanBatch, ok := batch.AsSpanBatch() + if !ok { + return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch")) + } + + validity, _ := checkSpanBatchPrefix(ctx, bs.config, bs.Log(), bs.l1Blocks, parent, spanBatch, bs.origin, bs.l2) + switch validity { + case BatchAccept: // continue + spanBatch.LogContext(bs.Log()).Info("Found next valid span batch") + case BatchPast: + spanBatch.LogContext(bs.Log()).Warn("Dropping past span batch") + // NotEnoughData to read in next batch until we're through all past batches + return nil, NotEnoughData + case BatchDrop: // drop, try next + spanBatch.LogContext(bs.Log()).Warn("Dropping invalid span batch, flushing channel") + bs.FlushChannel() + return nil, NotEnoughData + case BatchUndecided: // l2 fetcher error, try again + spanBatch.LogContext(bs.Log()).Warn("Undecided span batch") + return nil, NotEnoughData + case BatchFuture: // can't happen with Holocene + return nil, NewCriticalError(errors.New("impossible future batch validity")) + } + + // If next batch is SpanBatch, convert it to SingularBatches. + // TODO(12444): maybe create iterator here instead, save to nextSpan + // Need to make sure this doesn't error where the iterator wouldn't, + // otherwise this wouldn't be correctly implementing partial span batch invalidation. + // From what I can tell, it is fine because the only error case is if the l1Blocks are + // missing a block, which would be a logic error. Although, if the node restarts mid-way + // through a span batch and the sync start only goes back one channel timeout from the + // mid-way safe block, it may actually miss l1 blocks! Need to check. + // We could fix this by fast-dropping past batches from the span batch. + singularBatches, err := spanBatch.GetSingularBatches(bs.l1Blocks, parent) + if err != nil { + return nil, NewCriticalError(err) + } + bs.nextSpan = singularBatches + // span-batches are non-empty, so the below pop is safe. + return bs.popNextBatch(parent), nil + default: + return nil, NewCriticalError(fmt.Errorf("unrecognized batch type: %d", typ)) + } +} diff --git a/op-node/rollup/derive/batches.go b/op-node/rollup/derive/batches.go index bde2280745520..345345ed35bb8 100644 --- a/op-node/rollup/derive/batches.go +++ b/op-node/rollup/derive/batches.go @@ -26,6 +26,9 @@ const ( BatchUndecided // BatchFuture indicates that the batch may be valid, but cannot be processed yet and should be checked again later BatchFuture + // BatchPast indicates that the batch is from the past, i.e. its timestamp is smaller or equal + // to the safe head's timestamp. + BatchPast ) // CheckBatch checks if the given batch can be applied on top of the given l2SafeHead, given the contextual L1 blocks the batch was included in. @@ -69,11 +72,18 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo nextTimestamp := l2SafeHead.Time + cfg.BlockTime if batch.Timestamp > nextTimestamp { + if cfg.IsHolocene(l1InclusionBlock.Time) { + log.Warn("dropping future batch", "next_timestamp", nextTimestamp) + return BatchDrop + } log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp) return BatchFuture } if batch.Timestamp < nextTimestamp { - log.Warn("dropping batch with old timestamp", "min_timestamp", nextTimestamp) + log.Warn("dropping past batch with old timestamp", "min_timestamp", nextTimestamp) + if cfg.IsHolocene(l1InclusionBlock.Time) { + return BatchPast + } return BatchDrop } @@ -166,17 +176,19 @@ func checkSingularBatch(cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1Blo return BatchAccept } -// checkSpanBatch implements SpanBatch validation rule. -func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef, +// checkSpanBatchPrevix performs the span batch prefix rules for Holocene. +// Next to the validity, it also returns the parent L2 block as determined during the checks for +// further consumption. +func checkSpanBatchPrefix(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef, batch *SpanBatch, l1InclusionBlock eth.L1BlockRef, l2Fetcher SafeBlockFetcher, -) BatchValidity { +) (BatchValidity, eth.L2BlockRef) { // add details to the log log = batch.LogContext(log) // sanity check we have consistent inputs if len(l1Blocks) == 0 { log.Warn("missing L1 block input, cannot proceed with batch checking") - return BatchUndecided + return BatchUndecided, eth.L2BlockRef{} } epoch := l1Blocks[0] @@ -185,64 +197,70 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B if startEpochNum == batchOrigin.Number+1 { if len(l1Blocks) < 2 { log.Info("eager batch wants to advance epoch, but could not without more L1 blocks", "current_epoch", epoch.ID()) - return BatchUndecided + return BatchUndecided, eth.L2BlockRef{} } batchOrigin = l1Blocks[1] } if !cfg.IsDelta(batchOrigin.Time) { log.Warn("received SpanBatch with L1 origin before Delta hard fork", "l1_origin", batchOrigin.ID(), "l1_origin_time", batchOrigin.Time) - return BatchDrop + return BatchDrop, eth.L2BlockRef{} } nextTimestamp := l2SafeHead.Time + cfg.BlockTime if batch.GetTimestamp() > nextTimestamp { + if cfg.IsHolocene(l1InclusionBlock.Time) { + log.Warn("dropping future span batch", "next_timestamp", nextTimestamp) + return BatchDrop, eth.L2BlockRef{} + } log.Trace("received out-of-order batch for future processing after next batch", "next_timestamp", nextTimestamp) - return BatchFuture + return BatchFuture, eth.L2BlockRef{} } if batch.GetBlockTimestamp(batch.GetBlockCount()-1) < nextTimestamp { log.Warn("span batch has no new blocks after safe head") - return BatchDrop + if cfg.IsHolocene(l1InclusionBlock.Time) { + return BatchPast, eth.L2BlockRef{} + } + return BatchDrop, eth.L2BlockRef{} } // finding parent block of the span batch. // if the span batch does not overlap the current safe chain, parentBLock should be l2SafeHead. - parentNum := l2SafeHead.Number parentBlock := l2SafeHead if batch.GetTimestamp() < nextTimestamp { if batch.GetTimestamp() > l2SafeHead.Time { // batch timestamp cannot be between safe head and next timestamp log.Warn("batch has misaligned timestamp, block time is too short") - return BatchDrop + return BatchDrop, eth.L2BlockRef{} } if (l2SafeHead.Time-batch.GetTimestamp())%cfg.BlockTime != 0 { log.Warn("batch has misaligned timestamp, not overlapped exactly") - return BatchDrop + return BatchDrop, eth.L2BlockRef{} } - parentNum = l2SafeHead.Number - (l2SafeHead.Time-batch.GetTimestamp())/cfg.BlockTime - 1 + parentNum := l2SafeHead.Number - (l2SafeHead.Time-batch.GetTimestamp())/cfg.BlockTime - 1 var err error parentBlock, err = l2Fetcher.L2BlockRefByNumber(ctx, parentNum) if err != nil { log.Warn("failed to fetch L2 block", "number", parentNum, "err", err) // unable to validate the batch for now. retry later. - return BatchUndecided + return BatchUndecided, eth.L2BlockRef{} } } if !batch.CheckParentHash(parentBlock.Hash) { log.Warn("ignoring batch with mismatching parent hash", "parent_block", parentBlock.Hash) - return BatchDrop + return BatchDrop, parentBlock } // Filter out batches that were included too late. if startEpochNum+cfg.SeqWindowSize < l1InclusionBlock.Number { log.Warn("batch was included too late, sequence window expired") - return BatchDrop + return BatchDrop, parentBlock } // Check the L1 origin of the batch if startEpochNum > parentBlock.L1Origin.Number+1 { log.Warn("batch is for future epoch too far ahead, while it has the next timestamp, so it must be invalid", "current_epoch", epoch.ID()) - return BatchDrop + return BatchDrop, parentBlock } endEpochNum := batch.GetBlockEpochNum(batch.GetBlockCount() - 1) @@ -252,7 +270,7 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B if l1Block.Number == endEpochNum { if !batch.CheckOriginHash(l1Block.Hash) { log.Warn("batch is for different L1 chain, epoch hash does not match", "expected", l1Block.Hash) - return BatchDrop + return BatchDrop, parentBlock } originChecked = true break @@ -260,13 +278,26 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B } if !originChecked { log.Info("need more l1 blocks to check entire origins of span batch") - return BatchUndecided + return BatchUndecided, parentBlock } if startEpochNum < parentBlock.L1Origin.Number { log.Warn("dropped batch, epoch is too old", "minimum", parentBlock.ID()) - return BatchDrop + return BatchDrop, parentBlock } + return BatchAccept, parentBlock +} + +// checkSpanBatch performs the full SpanBatch validation rules. +func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1Blocks []eth.L1BlockRef, l2SafeHead eth.L2BlockRef, + batch *SpanBatch, l1InclusionBlock eth.L1BlockRef, l2Fetcher SafeBlockFetcher, +) BatchValidity { + prefixValidity, parentBlock := checkSpanBatchPrefix(ctx, cfg, log, l1Blocks, l2SafeHead, batch, l1InclusionBlock, l2Fetcher) + if prefixValidity != BatchAccept { + return prefixValidity + } + + startEpochNum := uint64(batch.GetStartEpochNum()) originIdx := 0 originAdvanced := startEpochNum == parentBlock.L1Origin.Number+1 @@ -334,6 +365,8 @@ func checkSpanBatch(ctx context.Context, cfg *rollup.Config, log log.Logger, l1B } } + parentNum := parentBlock.Number + nextTimestamp := l2SafeHead.Time + cfg.BlockTime // Check overlapped blocks if batch.GetTimestamp() < nextTimestamp { for i := uint64(0); i < l2SafeHead.Number-parentNum; i++ { diff --git a/op-node/rollup/derive/batches_test.go b/op-node/rollup/derive/batches_test.go index 125fc0f02e2cf..a50773c71d754 100644 --- a/op-node/rollup/derive/batches_test.go +++ b/op-node/rollup/derive/batches_test.go @@ -43,15 +43,16 @@ func deltaAt(t *uint64) func(*rollup.Config) { func fjordAt(t *uint64) func(*rollup.Config) { return func(c *rollup.Config) { + c.DeltaTime = &zero64 c.FjordTime = t } } -func multiMod[T any](mods ...func(T)) func(T) { - return func(x T) { - for _, mod := range mods { - mod(x) - } +func holoceneAt(t *uint64) func(*rollup.Config) { + return func(c *rollup.Config) { + c.DeltaTime = &zero64 + c.FjordTime = &zero64 + c.HoloceneTime = t } } @@ -263,6 +264,23 @@ func TestValidBatch(t *testing.T) { }, Expected: BatchFuture, }, + { + Name: "future timestamp with Holocene at L1 inc", + L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, + L2SafeHead: l2A0, + Batch: BatchWithL1InclusionBlock{ + L1InclusionBlock: l1B, + Batch: &SingularBatch{ + ParentHash: l2A1.ParentHash, + EpochNum: rollup.Epoch(l2A1.L1Origin.Number), + EpochHash: l2A1.L1Origin.Hash, + Timestamp: l2A1.Time + 1, // 1 too high + }, + }, + Expected: BatchDrop, + ExpectedLog: "dropping future batch", + ConfigMod: holoceneAt(&l1B.Time), + }, { Name: "old timestamp", L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, @@ -279,6 +297,23 @@ func TestValidBatch(t *testing.T) { }, Expected: BatchDrop, }, + { + Name: "past timestamp with Holocene at L1 inc", + L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, + L2SafeHead: l2A0, + Batch: BatchWithL1InclusionBlock{ + L1InclusionBlock: l1B, + Batch: &SingularBatch{ + ParentHash: l2A1.ParentHash, + EpochNum: rollup.Epoch(l2A1.L1Origin.Number), + EpochHash: l2A1.L1Origin.Hash, + Timestamp: l2A0.Time, // repeating the same time + }, + }, + Expected: BatchPast, + ExpectedLog: "dropping past batch with old timestamp", + ConfigMod: holoceneAt(&l1B.Time), + }, { Name: "misaligned timestamp", L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, @@ -636,6 +671,26 @@ func TestValidBatch(t *testing.T) { ExpectedLog: "received out-of-order batch for future processing after next batch", ConfigMod: deltaAtGenesis, }, + { + Name: "future timestamp with Holocene at L1 inc", + L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, + L2SafeHead: l2A0, + Batch: BatchWithL1InclusionBlock{ + L1InclusionBlock: l1B, + Batch: initializedSpanBatch([]*SingularBatch{ + { + ParentHash: l2A1.ParentHash, + EpochNum: rollup.Epoch(l2A1.L1Origin.Number), + EpochHash: l2A1.L1Origin.Hash, + Timestamp: l2A1.Time + 1, // 1 too high + Transactions: nil, + }, + }, uint64(0), big.NewInt(0)), + }, + Expected: BatchDrop, + ExpectedLog: "dropping future span batch", + ConfigMod: holoceneAt(&l1B.Time), + }, { Name: "misaligned timestamp", L1Blocks: []eth.L1BlockRef{l1A, l1B, l1C}, @@ -873,7 +928,7 @@ func TestValidBatch(t *testing.T) { }, uint64(0), big.NewInt(0)), }, Expected: BatchAccept, - ConfigMod: multiMod(deltaAtGenesis, fjordAt(&l1A.Time)), + ConfigMod: fjordAt(&l1A.Time), }, { Name: "sequencer time drift on same epoch with non-empty txs - long span", @@ -1277,6 +1332,33 @@ func TestValidBatch(t *testing.T) { ExpectedLog: "span batch has no new blocks after safe head", ConfigMod: deltaAtGenesis, }, + { + Name: "fully overlapping batch with Holocene", + L1Blocks: []eth.L1BlockRef{l1A, l1B}, + L2SafeHead: l2A2, + Batch: BatchWithL1InclusionBlock{ + L1InclusionBlock: l1B, + Batch: initializedSpanBatch([]*SingularBatch{ + { + ParentHash: l2A0.Hash, + EpochNum: rollup.Epoch(l2A1.L1Origin.Number), + EpochHash: l2A1.L1Origin.Hash, + Timestamp: l2A1.Time, + Transactions: nil, + }, + { + ParentHash: l2A1.Hash, + EpochNum: rollup.Epoch(l2A2.L1Origin.Number), + EpochHash: l2A2.L1Origin.Hash, + Timestamp: l2A2.Time, + Transactions: nil, + }, + }, uint64(0), big.NewInt(0)), + }, + Expected: BatchPast, + ExpectedLog: "span batch has no new blocks after safe head", + ConfigMod: holoceneAt(&l1B.Time), + }, { Name: "overlapping batch with invalid parent hash", L1Blocks: []eth.L1BlockRef{l1A, l1B}, diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index f7dde867bc9da..2aabf6bc9fdde 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -25,7 +25,10 @@ type ChannelInReader struct { metrics Metrics } -var _ ResettableStage = (*ChannelInReader)(nil) +var ( + _ ResettableStage = (*ChannelInReader)(nil) + _ ChannelFlusher = (*ChannelInReader)(nil) +) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. func NewChannelInReader(cfg *rollup.Config, log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader { @@ -122,3 +125,8 @@ func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.Sy cr.nextBatchFn = nil return io.EOF } + +func (cr *ChannelInReader) FlushChannel() { + cr.nextBatchFn = nil + // TODO(12157): cr.prev.FlushChannel() - when we do wiring with ChannelStage +}