diff --git a/op-batcher/architecture.png b/op-batcher/architecture.png new file mode 100644 index 000000000000..0eab940fbb52 Binary files /dev/null and b/op-batcher/architecture.png differ diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index dd0827d4686c..9ae52be405dc 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -49,10 +49,10 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup func (c *channel) TxFailed(id string) { if data, ok := c.pendingTransactions[id]; ok { c.log.Trace("marked transaction as failed", "id", id) - // Note: when the batcher is changed to send multiple frames per tx, - // this needs to be changed to iterate over all frames of the tx data - // and re-queue them. - c.channelBuilder.PushFrames(data.Frames()...) + // Rewind to the first frame of the failed tx + // -- the frames are ordered, and we want to send them + // all again. + c.channelBuilder.RewindFrameCursor(data.Frames()[0]) delete(c.pendingTransactions, id) } else { c.log.Warn("unknown transaction marked as failed", "id", id) @@ -61,18 +61,16 @@ func (c *channel) TxFailed(id string) { c.metr.RecordBatchTxFailed() } -// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in -// a channel have been marked as confirmed on L1 the channel may be invalid & need to be -// resubmitted. -// This function may reset the pending channel if the pending channel has timed out. -func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) { +// TxConfirmed marks a transaction as confirmed on L1. Returns a bool indicating +// whether the channel timed out on chain. +func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool { c.metr.RecordBatchTxSubmitted() c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock) if _, ok := c.pendingTransactions[id]; !ok { c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) // TODO: This can occur if we clear the channel while there are still pending transactions // We need to keep track of stale transactions instead - return false, nil + return false } delete(c.pendingTransactions, id) c.confirmedTransactions[id] = inclusionBlock @@ -82,21 +80,20 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number) c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number) + if c.isFullySubmitted() { + c.metr.RecordChannelFullySubmitted(c.ID()) + c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock) + } + // If this channel timed out, put the pending blocks back into the local saved blocks // and then reset this state so it can try to build a new channel. if c.isTimedOut() { c.metr.RecordChannelTimedOut(c.ID()) c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock) - return true, c.channelBuilder.Blocks() - } - // If we are done with this channel, record that. - if c.isFullySubmitted() { - c.metr.RecordChannelFullySubmitted(c.ID()) - c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock) - return true, nil + return true } - return false, nil + return false } // Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0. @@ -136,7 +133,7 @@ func (c *channel) ID() derive.ChannelID { func (c *channel) NextTxData() txData { nf := c.cfg.MaxFramesPerTx() txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} - for i := 0; i < nf && c.channelBuilder.HasFrame(); i++ { + for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ { frame := c.channelBuilder.NextFrame() txdata.frames = append(txdata.frames, frame) } @@ -151,7 +148,7 @@ func (c *channel) NextTxData() txData { func (c *channel) HasTxData() bool { if c.IsFull() || // If the channel is full, we should start to submit it !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx - return c.channelBuilder.HasFrame() + return c.channelBuilder.HasPendingFrame() } // Collect enough frames if channel is not full yet return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx()) diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index ae1fb03d2841..597b5ed3e144 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/queue" "github.com/ethereum/go-ethereum/core/types" ) @@ -65,7 +66,7 @@ type ChannelBuilder struct { // current channel co derive.ChannelOut // list of blocks in the channel. Saved in case the channel must be rebuilt - blocks []*types.Block + blocks queue.Queue[*types.Block] // latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel latestL1Origin eth.BlockID // oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel @@ -75,7 +76,12 @@ type ChannelBuilder struct { // oldestL2 is the oldest L2 block of all the L2 blocks that have been added to the channel oldestL2 eth.BlockID // frames data queue, to be send as txs - frames []frameData + frames queue.Queue[frameData] + // frameCursor tracks which frames in the queue were submitted + // frames[frameCursor] is the next unsubmitted (pending) frame + // frameCursor = len(frames) is reserved for when + // there are no pending (next unsubmitted) frames + frameCursor int // total frames counter numFrames int // total amount of output data of all frames created yet @@ -190,7 +196,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro return l1info, fmt.Errorf("adding block to channel out: %w", err) } - c.blocks = append(c.blocks, block) + c.blocks.Enqueue(block) c.updateSwTimeout(l1info.Number) if l1info.Number > c.latestL1Origin.Number { @@ -312,11 +318,11 @@ func (c *ChannelBuilder) setFullErr(err error) { } // OutputFrames creates new frames with the channel out. It should be called -// after AddBlock and before iterating over available frames with HasFrame and +// after AddBlock and before iterating over pending frames with HasFrame and // NextFrame. // // If the channel isn't full yet, it will conservatively only -// pull readily available frames from the compression output. +// pull pending frames from the compression output. // If it is full, the channel is closed and all remaining // frames will be created, possibly with a small leftover frame. func (c *ChannelBuilder) OutputFrames() error { @@ -387,7 +393,7 @@ func (c *ChannelBuilder) outputFrame() error { id: frameID{chID: c.co.ID(), frameNumber: fn}, data: buf.Bytes(), } - c.frames = append(c.frames, frame) + c.frames.Enqueue(frame) c.numFrames++ c.outputBytes += len(frame.data) return err // possibly io.EOF (last frame) @@ -402,46 +408,47 @@ func (c *ChannelBuilder) Close() { } // TotalFrames returns the total number of frames that were created in this channel so far. -// It does not decrease when the frames queue is being emptied. func (c *ChannelBuilder) TotalFrames() int { return c.numFrames } -// HasFrame returns whether there's any available frame. If true, it can be -// popped using NextFrame(). +// HasPendingFrame returns whether there's any pending frame. If true, it can be +// dequeued using NextFrame(). // // Call OutputFrames before to create new frames from the channel out // compression pipeline. -func (c *ChannelBuilder) HasFrame() bool { - return len(c.frames) > 0 +func (c *ChannelBuilder) HasPendingFrame() bool { + return c.frameCursor < c.frames.Len() } // PendingFrames returns the number of pending frames in the frames queue. -// It is larger zero iff HasFrame() returns true. +// It is larger than zero iff HasFrame() returns true. func (c *ChannelBuilder) PendingFrames() int { - return len(c.frames) + return c.frames.Len() - c.frameCursor } -// NextFrame dequeues the next available frame. -// HasFrame must be called prior to check if there's a next frame available. +// NextFrame returns the next pending frame and increments the frameCursor +// HasFrame must be called prior to check if there a next pending frame exists. // Panics if called when there's no next frame. func (c *ChannelBuilder) NextFrame() frameData { - if len(c.frames) == 0 { + if len(c.frames) <= c.frameCursor { panic("no next frame") } - - f := c.frames[0] - c.frames = c.frames[1:] + f := c.frames[c.frameCursor] + c.frameCursor++ return f } -// PushFrames adds the frames back to the internal frames queue. Panics if not of -// the same channel. -func (c *ChannelBuilder) PushFrames(frames ...frameData) { - for _, f := range frames { - if f.id.chID != c.ID() { - panic("wrong channel") - } - c.frames = append(c.frames, f) +// RewindFrameCursor moves the frameCursor to point at the supplied frame +// only if it is ahead of it. +// Panics if the frame is not in this channel. +func (c *ChannelBuilder) RewindFrameCursor(frame frameData) { + if c.frames.Len() <= int(frame.id.frameNumber) || + len(c.frames[frame.id.frameNumber].data) != len(frame.data) || + c.frames[frame.id.frameNumber].id.chID != frame.id.chID { + panic("cannot rewind to unknown frame") + } + if c.frameCursor > int(frame.id.frameNumber) { + c.frameCursor = int(frame.id.frameNumber) } } diff --git a/op-batcher/batcher/channel_builder_test.go b/op-batcher/batcher/channel_builder_test.go index 957f9ae59739..6994186b7f07 100644 --- a/op-batcher/batcher/channel_builder_test.go +++ b/op-batcher/batcher/channel_builder_test.go @@ -299,6 +299,7 @@ func TestChannelBuilderBatchType(t *testing.T) { {"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames}, {"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes}, {"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes}, + {"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic}, } for _, test := range tests { test := test @@ -340,7 +341,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) { }, data: expectedBytes, } - cb.PushFrames(frameData) + cb.frames = append(cb.frames, frameData) // There should only be 1 frame in the channel builder require.Equal(t, 1, cb.PendingFrames()) @@ -355,7 +356,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) { require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() }) } -// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id +// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when we try to rewind the cursor with an invalid frame id func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { channelConfig := defaultTestChannelConfig() channelConfig.BatchType = batchType @@ -377,7 +378,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { // The frame push should panic since we constructed a new channel out // so the channel out id won't match - require.PanicsWithValue(t, "wrong channel", func() { + require.PanicsWithValue(t, "cannot rewind to unknown frame", func() { frame := frameData{ id: frameID{ chID: co.ID(), @@ -385,7 +386,7 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) { }, data: buf.Bytes(), } - cb.PushFrames(frame) + cb.RewindFrameCursor(frame) }) } @@ -625,11 +626,11 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) { require.NoError(cb.OutputFrames()) - require.True(cb.HasFrame()) + require.True(cb.HasPendingFrame()) f := cb.NextFrame() require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame - require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix + require.False(cb.HasPendingFrame(), "no leftover frame expected") // would fail without fix } func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { @@ -656,8 +657,8 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) { expectedInputBytes = 47 } require.Equal(t, expectedInputBytes, cb.co.InputBytes()) - require.Equal(t, 1, len(cb.blocks)) - require.Equal(t, 0, len(cb.frames)) + require.Equal(t, 1, cb.blocks.Len()) + require.Equal(t, 0, cb.frames.Len()) require.True(t, cb.IsFull()) // Since the channel output is full, the next call to AddBlock @@ -858,7 +859,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) { // empty queue for pf := nf - 1; pf >= 0; pf-- { - require.True(cb.HasFrame()) + require.True(cb.HasPendingFrame()) _ = cb.NextFrame() require.Equal(cb.PendingFrames(), pf) require.Equal(cb.TotalFrames(), nf) @@ -932,7 +933,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) { require.Greater(cb.PendingFrames(), 1) var flen int - for cb.HasFrame() { + for cb.HasPendingFrame() { f := cb.NextFrame() flen += len(f.data) } diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 8e9870413a40..4208a8cd3795 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -39,6 +39,10 @@ type channelManager struct { // All blocks since the last request for new tx data. blocks queue.Queue[*types.Block] + // blockCursor is an index into blocks queue. It points at the next block + // to build a channel with. blockCursor = len(blocks) is reserved for when + // there are no blocks ready to build with. + blockCursor int // The latest L1 block from all the L2 blocks in the most recently submitted channel. // Used to track channel duration timeouts. l1OriginLastSubmittedChannel eth.BlockID @@ -53,9 +57,6 @@ type channelManager struct { channelQueue []*channel // used to lookup channels by tx ID upon tx success / failure txChannels map[string]*channel - - // if set to true, prevents production of any new channel frames - closed bool } func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager { @@ -81,14 +82,18 @@ func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) { defer s.mu.Unlock() s.log.Trace("clearing channel manager state") s.blocks.Clear() + s.blockCursor = 0 s.l1OriginLastSubmittedChannel = l1OriginLastSubmittedChannel s.tip = common.Hash{} - s.closed = false s.currentChannel = nil s.channelQueue = nil s.txChannels = make(map[string]*channel) } +func (s *channelManager) pendingBlocks() int { + return s.blocks.Len() - s.blockCursor +} + // TxFailed records a transaction as failed. It will attempt to resubmit the data // in the failed transaction. func (s *channelManager) TxFailed(_id txID) { @@ -98,34 +103,21 @@ func (s *channelManager) TxFailed(_id txID) { if channel, ok := s.txChannels[id]; ok { delete(s.txChannels, id) channel.TxFailed(id) - if s.closed && channel.NoneSubmitted() { - s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", channel.ID()) - s.removePendingChannel(channel) - } } else { s.log.Warn("transaction from unknown channel marked as failed", "id", id) } } -// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in -// a channel have been marked as confirmed on L1 the channel may be invalid & need to be -// resubmitted. -// This function may reset the pending channel if the pending channel has timed out. +// TxConfirmed marks a transaction as confirmed on L1. Only if the channel timed out +// the channelManager's state is modified. func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { s.mu.Lock() defer s.mu.Unlock() id := _id.String() if channel, ok := s.txChannels[id]; ok { delete(s.txChannels, id) - done, blocksToRequeue := channel.TxConfirmed(id, inclusionBlock) - if done { - s.removePendingChannel(channel) - if len(blocksToRequeue) > 0 { - s.blocks.Prepend(blocksToRequeue...) - } - for _, b := range blocksToRequeue { - s.metr.RecordL2BlockInPendingQueue(b) - } + if timedOut := channel.TxConfirmed(id, inclusionBlock); timedOut { + s.handleChannelInvalidated(channel) } } else { s.log.Warn("transaction from unknown channel marked as confirmed", "id", id) @@ -134,23 +126,48 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) { s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock) } -// removePendingChannel removes the given completed channel from the manager's state. -func (s *channelManager) removePendingChannel(channel *channel) { - if s.currentChannel == channel { - s.currentChannel = nil +// rewindToBlock updates the blockCursor to point at +// the block with the supplied hash, only if that block exists +// in the block queue and the blockCursor is ahead of it. +// Panics if the block is not in state. +func (s *channelManager) rewindToBlock(block eth.BlockID) { + idx := block.Number - s.blocks[0].Number().Uint64() + if s.blocks[idx].Hash() == block.Hash && idx < uint64(s.blockCursor) { + s.blockCursor = int(idx) + } else { + panic("tried to rewind to nonexistent block") } - index := -1 - for i, c := range s.channelQueue { - if c == channel { - index = i - break +} + +// handleChannelInvalidated rewinds the channelManager's blockCursor +// to point at the first block added to the provided channel, +// and removes the channel from the channelQueue, along with +// any channels which are newer than the provided channel. +func (s *channelManager) handleChannelInvalidated(c *channel) { + if len(c.channelBuilder.blocks) > 0 { + // This is usually true, but there is an edge case + // where a channel timed out before any blocks got added. + // In that case we end up with an empty frame (header only), + // and there are no blocks to requeue. + blockID := eth.ToBlockID(c.channelBuilder.blocks[0]) + for _, block := range c.channelBuilder.blocks { + s.metr.RecordL2BlockInPendingQueue(block) } + s.rewindToBlock(blockID) + } else { + s.log.Debug("channelManager.handleChanneInvalidated: channel had no blocks") } - if index < 0 { - s.log.Warn("channel not found in channel queue", "id", channel.ID()) - return + + // Trim provided channel and any older channels: + for i := range s.channelQueue { + if s.channelQueue[i] == c { + s.channelQueue = s.channelQueue[:i] + break + } } - s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...) + + // We want to start writing to a new channel, so reset currentChannel. + s.currentChannel = nil } // nextTxData dequeues frames from the channel and returns them encoded in a transaction. @@ -207,7 +224,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...", "useBlobsBefore", s.defaultCfg.UseBlobs, "useBlobsAfter", newCfg.UseBlobs) - s.Requeue(newCfg) + + // Invalidate the channel so its blocks + // get requeued: + s.handleChannelInvalidated(channel) + + // Set the defaultCfg so new channels + // pick up the new ChannelConfig + s.defaultCfg = newCfg + + // Try again to get data to send on chain. channel, err = s.getReadyChannel(l1Head) if err != nil { return emptyTxData, err @@ -238,14 +264,9 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) { return firstWithTxData, nil } - if s.closed { - return nil, io.EOF - } - // No pending tx data, so we have to add new blocks to the channel - // If we have no saved blocks, we will not be able to create valid frames - if s.blocks.Len() == 0 { + if s.pendingBlocks() == 0 { return nil, io.EOF } @@ -299,8 +320,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { s.log.Info("Created channel", "id", pc.ID(), "l1Head", l1Head, + "blocks_pending", s.pendingBlocks(), "l1OriginLastSubmittedChannel", s.l1OriginLastSubmittedChannel, - "blocks_pending", s.blocks.Len(), "batch_type", cfg.BatchType, "compression_algo", cfg.CompressorConfig.CompressionAlgo, "target_num_frames", cfg.TargetNumFrames, @@ -331,7 +352,7 @@ func (s *channelManager) processBlocks() error { latestL2ref eth.L2BlockRef ) - for i := 0; ; i++ { + for i := s.blockCursor; ; i++ { block, ok := s.blocks.PeekN(i) if !ok { break @@ -355,7 +376,7 @@ func (s *channelManager) processBlocks() error { } } - _, _ = s.blocks.DequeueN(blocksAdded) + s.blockCursor += blocksAdded s.metr.RecordL2BlocksAdded(latestL2ref, blocksAdded, @@ -364,7 +385,7 @@ func (s *channelManager) processBlocks() error { s.currentChannel.ReadyBytes()) s.log.Debug("Added blocks to channel", "blocks_added", blocksAdded, - "blocks_pending", s.blocks.Len(), + "blocks_pending", s.pendingBlocks(), "channel_full", s.currentChannel.IsFull(), "input_bytes", s.currentChannel.InputBytes(), "ready_bytes", s.currentChannel.ReadyBytes(), @@ -384,7 +405,7 @@ func (s *channelManager) outputFrames() error { inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes() s.metr.RecordChannelClosed( s.currentChannel.ID(), - s.blocks.Len(), + s.pendingBlocks(), s.currentChannel.TotalFrames(), inBytes, outBytes, @@ -398,7 +419,7 @@ func (s *channelManager) outputFrames() error { s.log.Info("Channel closed", "id", s.currentChannel.ID(), - "blocks_pending", s.blocks.Len(), + "blocks_pending", s.pendingBlocks(), "num_frames", s.currentChannel.TotalFrames(), "input_bytes", inBytes, "output_bytes", outBytes, @@ -443,83 +464,77 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager") -// Close clears any pending channels that are not in-flight already, to leave a clean derivation state. -// Close then marks the remaining current open channel, if any, as "full" so it can be submitted as well. -// Close does NOT immediately output frames for the current remaining channel: -// as this might error, due to limitations on a single channel. -// Instead, this is part of the pending-channel submission work: after closing, -// the caller SHOULD drain pending channels by generating TxData repeatedly until there is none left (io.EOF). -// A ErrPendingAfterClose error will be returned if there are any remaining pending channels to submit. -func (s *channelManager) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { - return nil +// pruneSafeBlocks dequeues blocks from the internal blocks queue +// if they have now become safe. +func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) { + oldestBlock, ok := s.blocks.Peek() + if !ok { + // no blocks to prune + return } - s.closed = true - s.log.Info("Channel manager is closing") - - // Any pending state can be proactively cleared if there are no submitted transactions - for _, ch := range s.channelQueue { - if ch.NoneSubmitted() { - s.log.Info("Channel has no past or pending submission - dropping", "id", ch.ID()) - s.removePendingChannel(ch) - } else { - s.log.Info("Channel is in-flight and will need to be submitted after close", "id", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions)) - } + if newSafeHead.Number+1 == oldestBlock.NumberU64() { + // no blocks to prune + return } - s.log.Info("Reviewed all pending channels on close", "remaining", len(s.channelQueue)) - if s.currentChannel == nil { - return nil + if newSafeHead.Number+1 < oldestBlock.NumberU64() { + // This could happen if there was an L1 reorg. + s.log.Warn("safe head reversed, clearing channel manager state", + "oldestBlock", eth.ToBlockID(oldestBlock), + "newSafeBlock", newSafeHead) + // We should restart work from the new safe head, + // and therefore prune all the blocks. + s.Clear(newSafeHead.L1Origin) + return } - // If the channel is already full, we don't need to close it or output frames. - // This would already have happened in TxData. - if !s.currentChannel.IsFull() { - // Force-close the remaining open channel early (if not already closed): - // it will be marked as "full" due to service termination. - s.currentChannel.Close() + numBlocksToDequeue := newSafeHead.Number + 1 - oldestBlock.NumberU64() - // Final outputFrames call in case there was unflushed data in the compressor. - if err := s.outputFrames(); err != nil { - return fmt.Errorf("outputting frames during close: %w", err) - } + if numBlocksToDequeue > uint64(s.blocks.Len()) { + // This could happen if the batcher restarted. + // The sequencer may have derived the safe chain + // from channels sent by a previous batcher instance. + s.log.Warn("safe head above unsafe head, clearing channel manager state", + "unsafeBlock", eth.ToBlockID(s.blocks[s.blocks.Len()-1]), + "newSafeBlock", newSafeHead) + // We should restart work from the new safe head, + // and therefore prune all the blocks. + s.Clear(newSafeHead.L1Origin) + return } - if s.currentChannel.HasTxData() { - // Make it clear to the caller that there is remaining pending work. - return ErrPendingAfterClose + if s.blocks[numBlocksToDequeue-1].Hash() != newSafeHead.Hash { + s.log.Warn("safe chain reorg, clearing channel manager state", + "existingBlock", eth.ToBlockID(s.blocks[numBlocksToDequeue-1]), + "newSafeBlock", newSafeHead) + // We should restart work from the new safe head, + // and therefore prune all the blocks. + s.Clear(newSafeHead.L1Origin) + return } - return nil -} -// Requeue rebuilds the channel manager state by -// rewinding blocks back from the channel queue, and setting the defaultCfg. -func (s *channelManager) Requeue(newCfg ChannelConfig) { - newChannelQueue := []*channel{} - blocksToRequeue := []*types.Block{} - for _, channel := range s.channelQueue { - if !channel.NoneSubmitted() { - newChannelQueue = append(newChannelQueue, channel) - continue - } - blocksToRequeue = append(blocksToRequeue, channel.channelBuilder.Blocks()...) - } + // This shouldn't return an error because + // We already checked numBlocksToDequeue <= s.blocks.Len() + _, _ = s.blocks.DequeueN(int(numBlocksToDequeue)) + s.blockCursor -= int(numBlocksToDequeue) - // We put the blocks back at the front of the queue: - s.blocks.Prepend(blocksToRequeue...) - for _, b := range blocksToRequeue { - s.metr.RecordL2BlockInPendingQueue(b) + if s.blockCursor < 0 { + panic("negative blockCursor") } +} - // Channels which where already being submitted are put back - s.channelQueue = newChannelQueue - s.currentChannel = nil - // Setting the defaultCfg will cause new channels - // to pick up the new ChannelConfig - s.defaultCfg = newCfg +// pruneChannels dequeues channels from the internal channels queue +// if they were built using blocks which are now safe +func (s *channelManager) pruneChannels(newSafeHead eth.L2BlockRef) { + i := 0 + for _, ch := range s.channelQueue { + if ch.LatestL2().Number > newSafeHead.Number { + break + } + i++ + } + s.channelQueue = s.channelQueue[i:] } // PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2 diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index d3a67a1bf1f4..1c742207a5f0 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -40,10 +40,6 @@ func TestChannelManagerBatchType(t *testing.T) { {"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained}, {"ChannelManager_Clear", ChannelManager_Clear}, {"ChannelManager_TxResend", ChannelManager_TxResend}, - {"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse}, - {"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel}, - {"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel}, - {"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed}, } for _, test := range tests { test := test @@ -154,14 +150,13 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { // Process the blocks // We should have a pending channel with 1 frame - // and no more blocks since processBlocks consumes - // the list + require.NoError(m.processBlocks()) require.NoError(m.currentChannel.channelBuilder.co.Flush()) require.NoError(m.outputFrames()) _, err := m.nextTxData(m.currentChannel) require.NoError(err) - require.Len(m.blocks, 0) + require.Equal(m.blockCursor, len(m.blocks)) require.NotNil(m.l1OriginLastSubmittedChannel) require.Equal(newL1Tip, m.tip) require.Len(m.currentChannel.pendingTransactions, 1) @@ -173,7 +168,7 @@ func ChannelManager_Clear(t *testing.T, batchType uint) { ParentHash: a.Hash(), }, nil, nil, nil) require.NoError(m.AddL2Block(b)) - require.Len(m.blocks, 1) + require.Equal(m.blockCursor, len(m.blocks)-1) require.Equal(b.Hash(), m.tip) safeL1Origin := eth.BlockID{ @@ -228,220 +223,6 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { require.Len(fs, 1) } -// ChannelManagerCloseBeforeFirstUse ensures that the channel manager -// will not produce any frames if closed immediately. -func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) { - require := require.New(t) - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - log := testlog.Logger(t, log.LevelCrit) - m := NewChannelManager(log, metrics.NoopMetrics, - channelManagerTestConfig(10000, batchType), - defaultTestRollupConfig, - ) - m.Clear(eth.BlockID{}) - - a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID) - - require.NoError(m.Close(), "Expected to close channel manager gracefully") - - err := m.AddL2Block(a) - require.NoError(err, "Failed to add L2 block") - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data") -} - -// ChannelManagerCloseNoPendingChannel ensures that the channel manager -// can gracefully close with no pending channels, and will not emit any new -// channel frames. -func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) { - require := require.New(t) - log := testlog.Logger(t, log.LevelCrit) - cfg := channelManagerTestConfig(10000, batchType) - cfg.CompressorConfig.TargetOutputSize = 1 // full on first block - cfg.ChannelTimeout = 1000 - m := NewChannelManager(log, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - m.Clear(eth.BlockID{}) - a := newMiniL2Block(0) - b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) - - err := m.AddL2Block(a) - require.NoError(err, "Failed to add L2 block") - - txdata, err := m.TxData(eth.BlockID{}) - require.NoError(err, "Expected channel manager to return valid tx data") - - m.TxConfirmed(txdata.ID(), eth.BlockID{}) - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected channel manager to EOF") - - require.NoError(m.Close(), "Expected to close channel manager gracefully") - - err = m.AddL2Block(b) - require.NoError(err, "Failed to add L2 block") - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data") -} - -// ChannelManagerClosePendingChannel ensures that the channel manager -// can gracefully close with a pending channel, and will not produce any -// new channel frames after this point. -func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) { - require := require.New(t) - // The number of batch txs depends on compression of the random data, hence the static test RNG seed. - // Example of different RNG seed that creates less than 2 frames: 1698700588902821588 - rng := rand.New(rand.NewSource(123)) - log := testlog.Logger(t, log.LevelError) - cfg := channelManagerTestConfig(10_000, batchType) - cfg.ChannelTimeout = 1000 - m := NewChannelManager(log, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - m.Clear(eth.BlockID{}) - - numTx := 20 // Adjust number of txs to make 2 frames - a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - - err := m.AddL2Block(a) - require.NoError(err, "Failed to add L2 block") - - txdata, err := m.TxData(eth.BlockID{}) - require.NoError(err, "Expected channel manager to produce valid tx data") - log.Info("generated first tx data", "len", txdata.Len()) - - m.TxConfirmed(txdata.ID(), eth.BlockID{}) - - require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data") - - txdata, err = m.TxData(eth.BlockID{}) - require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data") - log.Info("generated more tx data", "len", txdata.Len()) - - m.TxConfirmed(txdata.ID(), eth.BlockID{}) - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data") - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") -} - -// ChannelManager_Close_PartiallyPendingChannel ensures that the channel manager -// can gracefully close with a pending channel, where a block is still waiting -// inside the compressor to be flushed. -// -// This test runs only for singular batches on purpose. -// The SpanChannelOut writes full span batches to the compressor for -// every new block that's added, so NonCompressor cannot be used to -// set up a scenario where data is only partially flushed. -// Couldn't get the test to work even with modifying NonCompressor -// to flush half-way through writing to the compressor... -func TestChannelManager_Close_PartiallyPendingChannel(t *testing.T) { - require := require.New(t) - // The number of batch txs depends on compression of the random data, hence the static test RNG seed. - // Example of different RNG seed that creates less than 2 frames: 1698700588902821588 - rng := rand.New(rand.NewSource(123)) - log := testlog.Logger(t, log.LevelError) - cfg := ChannelConfig{ - MaxFrameSize: 2200, - ChannelTimeout: 1000, - TargetNumFrames: 100, - } - cfg.InitNoneCompressor() - m := NewChannelManager(log, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - m.Clear(eth.BlockID{}) - - numTx := 3 // Adjust number of txs to make 2 frames - a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - b := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) - bHeader := b.Header() - bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1)) - bHeader.ParentHash = a.Hash() - b = b.WithSeal(bHeader) - - require.NoError(m.AddL2Block(a), "adding 1st L2 block") - require.NoError(m.AddL2Block(b), "adding 2nd L2 block") - - // Inside TxData, the two blocks queued above are written to the compressor. - // The NonCompressor will flush the first, but not the second block, when - // adding the second block, setting up the test with a partially flushed - // compressor. - txdata, err := m.TxData(eth.BlockID{}) - require.NoError(err, "Expected channel manager to produce valid tx data") - log.Info("generated first tx data", "len", txdata.Len()) - - m.TxConfirmed(txdata.ID(), eth.BlockID{}) - - // ensure no new ready data before closing - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected unclosed channel manager to only return a single frame") - - require.ErrorIs(m.Close(), ErrPendingAfterClose, "Expected channel manager to error on close because of pending tx data") - require.NotNil(m.currentChannel) - require.ErrorIs(m.currentChannel.FullErr(), ErrTerminated, "Expected current channel to be terminated by Close") - - txdata, err = m.TxData(eth.BlockID{}) - require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data") - log.Info("generated more tx data", "len", txdata.Len()) - - m.TxConfirmed(txdata.ID(), eth.BlockID{}) - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") -} - -// ChannelManagerCloseAllTxsFailed ensures that the channel manager -// can gracefully close after producing transaction frames if none of these -// have successfully landed on chain. -func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) { - require := require.New(t) - rng := rand.New(rand.NewSource(1357)) - log := testlog.Logger(t, log.LevelCrit) - cfg := channelManagerTestConfig(100, batchType) - cfg.TargetNumFrames = 1000 - cfg.InitNoneCompressor() - m := NewChannelManager(log, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - m.Clear(eth.BlockID{}) - - a := derivetest.RandomL2BlockWithChainId(rng, 1000, defaultTestRollupConfig.L2ChainID) - - err := m.AddL2Block(a) - require.NoError(err, "Failed to add L2 block") - - drainTxData := func() (txdatas []txData) { - for { - txdata, err := m.TxData(eth.BlockID{}) - if err == io.EOF { - return - } - require.NoError(err, "Expected channel manager to produce valid tx data") - txdatas = append(txdatas, txdata) - } - } - - txdatas := drainTxData() - require.NotEmpty(txdatas) - - for _, txdata := range txdatas { - m.TxFailed(txdata.ID()) - } - - // Show that this data will continue to be emitted as long as the transaction - // fails and the channel manager is not closed - txdatas1 := drainTxData() - require.NotEmpty(txdatas) - require.ElementsMatch(txdatas, txdatas1, "expected same txdatas on re-attempt") - - for _, txdata := range txdatas1 { - m.TxFailed(txdata.ID()) - } - - require.NoError(m.Close(), "Expected to close channel manager gracefully") - - _, err = m.TxData(eth.BlockID{}) - require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data") -} - func TestChannelManager_ChannelCreation(t *testing.T) { l := testlog.Logger(t, log.LevelCrit) const maxChannelDuration = 15 @@ -543,10 +324,12 @@ func TestChannelManager_TxData(t *testing.T) { // * One when the channelManager was created // * One when the channel is about to be submitted - // * Potentially one more if the replacement channel is about to be submitted, - // this only happens when going from calldata->blobs because - // the channel is no longer ready to send until more data - // is added. + // * Potentially one more when the replacement channel + // is not immediately ready to be submitted, but later + // becomes ready after more data is added. + // This only happens when going from calldata->blobs because + // the channel is not immediately ready to send until more data + // is added due to blob channels having greater capacity. numExpectedAssessments int } @@ -591,7 +374,7 @@ func TestChannelManager_TxData(t *testing.T) { // we get some data to submit var data txData for { - m.blocks = []*types.Block{blockA} + m.blocks = append(m.blocks, blockA) data, err = m.TxData(eth.BlockID{}) if err == nil && data.Len() > 0 { break @@ -609,16 +392,15 @@ func TestChannelManager_TxData(t *testing.T) { } -// TestChannelManager_Requeue seeds the channel manager with blocks, +// TestChannelManager_handleChannelInvalidated seeds the channel manager with blocks, // takes a state snapshot, triggers the blocks->channels pipeline, -// and then calls Requeue. Finally, it asserts the channel manager's -// state is equal to the snapshot. It repeats this for a channel -// which has a pending transaction and verifies that Requeue is then -// a noop. -func TestChannelManager_Requeue(t *testing.T) { +// and then calls handleChannelInvalidated. It asserts on the final state of +// the channel manager. +func TestChannelManager_handleChannelInvalidated(t *testing.T) { l := testlog.Logger(t, log.LevelCrit) cfg := channelManagerTestConfig(100, derive.SingularBatchType) - m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + metrics := new(metrics.TestMetrics) + m := NewChannelManager(l, metrics, cfg, defaultTestRollupConfig) // Seed channel manager with blocks rng := rand.New(rand.NewSource(99)) @@ -631,50 +413,197 @@ func TestChannelManager_Requeue(t *testing.T) { m.blocks = stateSnapshot require.Empty(t, m.channelQueue) + // Place an old channel in the queue. + // This channel should not be affected by + // a requeue or a later channel timing out. + oldChannel := newChannel(l, nil, m.defaultCfg, defaultTestRollupConfig, 0, nil) + oldChannel.Close() + m.channelQueue = []*channel{oldChannel} + require.Len(t, m.channelQueue, 1) + + // Setup initial metrics + metrics.RecordL2BlockInPendingQueue(blockA) + metrics.RecordL2BlockInPendingQueue(blockB) + pendingBytesBefore := metrics.PendingBlocksBytesCurrent + // Trigger the blocks -> channelQueue data pipelining require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) - require.NotEmpty(t, m.channelQueue) + require.Len(t, m.channelQueue, 2) require.NoError(t, m.processBlocks()) // Assert that at least one block was processed into the channel - require.NotContains(t, m.blocks, blockA) + require.Equal(t, 1, m.blockCursor) - l1OriginBeforeRequeue := m.l1OriginLastSubmittedChannel + // Check metric decreased + metricsDelta := metrics.PendingBlocksBytesCurrent - pendingBytesBefore + require.Negative(t, metricsDelta) - // Call the function we are testing - m.Requeue(m.defaultCfg) + l1OriginBefore := m.l1OriginLastSubmittedChannel + + m.handleChannelInvalidated(m.currentChannel) // Ensure we got back to the state above require.Equal(t, m.blocks, stateSnapshot) - require.Empty(t, m.channelQueue) + require.Contains(t, m.channelQueue, oldChannel) + require.Len(t, m.channelQueue, 1) + + // Check metric came back up to previous value + require.Equal(t, pendingBytesBefore, metrics.PendingBlocksBytesCurrent) // Ensure the l1OridingLastSubmittedChannel was // not changed. This ensures the next channel // has its duration timeout deadline computed // properly. - require.Equal(t, l1OriginBeforeRequeue, m.l1OriginLastSubmittedChannel) + require.Equal(t, l1OriginBefore, m.l1OriginLastSubmittedChannel) // Trigger the blocks -> channelQueue data pipelining again require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) require.NotEmpty(t, m.channelQueue) require.NoError(t, m.processBlocks()) +} - // Assert that at least one block was processed into the channel - require.NotContains(t, m.blocks, blockA) +func TestChannelManager_PruneBlocks(t *testing.T) { + l := testlog.Logger(t, log.LevelDebug) + cfg := channelManagerTestConfig(100, derive.SingularBatchType) + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + + a := types.NewBlock(&types.Header{ + Number: big.NewInt(0), + }, nil, nil, nil) + b := types.NewBlock(&types.Header{ // This will shortly become the safe head + Number: big.NewInt(1), + ParentHash: a.Hash(), + }, nil, nil, nil) + c := types.NewBlock(&types.Header{ + Number: big.NewInt(2), + ParentHash: b.Hash(), + }, nil, nil, nil) + + require.NoError(t, m.AddL2Block(a)) + m.blockCursor += 1 + require.NoError(t, m.AddL2Block(b)) + m.blockCursor += 1 + require.NoError(t, m.AddL2Block(c)) + m.blockCursor += 1 + + // Normal path + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: b.Hash(), + Number: b.NumberU64(), + }) + require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks) + + // Safe chain didn't move, nothing to prune + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: b.Hash(), + Number: b.NumberU64(), + }) + require.Equal(t, queue.Queue[*types.Block]{c}, m.blocks) + + // Safe chain moved beyond the blocks we had + // state should be cleared + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: c.Hash(), + Number: uint64(99), + }) + require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) + + // No blocks to prune, NOOP + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: c.Hash(), + Number: c.NumberU64(), + }) + require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) + + // Put another block in + d := types.NewBlock(&types.Header{ + Number: big.NewInt(3), + ParentHash: c.Hash(), + }, nil, nil, nil) + require.NoError(t, m.AddL2Block(d)) + m.blockCursor += 1 + + // Safe chain reorg + // state should be cleared + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: a.Hash(), + Number: uint64(3), + }) + require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) + + // Put another block in + require.NoError(t, m.AddL2Block(d)) + m.blockCursor += 1 + + // Safe chain reversed + // state should be cleared + m.pruneSafeBlocks(eth.L2BlockRef{ + Hash: a.Hash(), // unused + Number: uint64(1), + }) + require.Equal(t, queue.Queue[*types.Block]{}, m.blocks) + +} + +func TestChannelManager_PruneChannels(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + cfg := channelManagerTestConfig(100, derive.SingularBatchType) + cfg.InitNoneCompressor() + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - // Now mark the 0th channel in the queue as already - // starting to send on chain - channel0 := m.channelQueue[0] - channel0.pendingTransactions["foo"] = txData{} - require.False(t, channel0.NoneSubmitted()) + A, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) + require.NoError(t, err) + B, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) + require.NoError(t, err) + C, err := newChannelWithChannelOut(l, metrics.NoopMetrics, cfg, m.rollupCfg, 0) + require.NoError(t, err) + + m.channelQueue = []*channel{A, B, C} + + numTx := 1 + rng := rand.New(rand.NewSource(123)) + a0 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) + a0 = a0.WithSeal(&types.Header{Number: big.NewInt(0)}) + a1 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) + a1 = a1.WithSeal(&types.Header{Number: big.NewInt(1)}) + b2 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) + b2 = b2.WithSeal(&types.Header{Number: big.NewInt(2)}) + b3 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) + b3 = b3.WithSeal(&types.Header{Number: big.NewInt(3)}) + c4 := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID) + c4 = c4.WithSeal(&types.Header{Number: big.NewInt(4)}) + + _, err = A.AddBlock(a0) + require.NoError(t, err) + _, err = A.AddBlock(a1) + require.NoError(t, err) + + _, err = B.AddBlock(b2) + require.NoError(t, err) + _, err = B.AddBlock(b3) + require.NoError(t, err) + + _, err = C.AddBlock(c4) + require.NoError(t, err) + + m.pruneChannels(eth.L2BlockRef{ + Number: uint64(3), + }) - // Call the function we are testing - m.Requeue(m.defaultCfg) + require.Equal(t, []*channel{C}, m.channelQueue) + + m.pruneChannels(eth.L2BlockRef{ + Number: uint64(4), + }) + + require.Equal(t, []*channel{}, m.channelQueue) + + m.pruneChannels(eth.L2BlockRef{ + Number: uint64(4), + }) - // The requeue shouldn't affect the pending channel - require.Contains(t, m.channelQueue, channel0) + require.Equal(t, []*channel{}, m.channelQueue) - require.NotContains(t, m.blocks, blockA) } func TestChannelManager_ChannelOutFactory(t *testing.T) { type ChannelOutWrapper struct { diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index 0aad780131c7..b36ce9311bce 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -113,7 +113,7 @@ func TestChannelManager_NextTxData(t *testing.T) { frameNumber: uint16(0), }, } - channel.channelBuilder.PushFrames(frame) + channel.channelBuilder.frames = append(channel.channelBuilder.frames, frame) require.Equal(t, 1, channel.PendingFrames()) // Now the nextTxData function should return the frame @@ -142,7 +142,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { mockframes := makeMockFrameDatas(chID, n+1) // put multiple frames into channel, but less than target - ch.channelBuilder.PushFrames(mockframes[:n-1]...) + ch.channelBuilder.frames = mockframes[:n-1] requireTxData := func(i int) { require.True(ch.HasTxData(), "expected tx data %d", i) @@ -160,7 +160,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { require.False(ch.HasTxData()) // put in last two - ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...) + ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...) for i := n - 1; i < n+1; i++ { requireTxData(i) } @@ -183,11 +183,11 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { mockframes := makeMockFrameDatas(chID, n+1) // put multiple frames into channel, but less than target - ch.channelBuilder.PushFrames(mockframes[:n-1]...) + ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[:n-1]...) require.False(ch.HasTxData()) // put in last two - ch.channelBuilder.PushFrames(mockframes[n-1 : n+1]...) + ch.channelBuilder.frames = append(ch.channelBuilder.frames, mockframes[n-1:n+1]...) require.True(ch.HasTxData()) txdata := ch.NextTxData() require.Len(txdata.frames, n) @@ -240,7 +240,8 @@ func TestChannelTxConfirmed(t *testing.T) { frameNumber: uint16(0), }, } - m.currentChannel.channelBuilder.PushFrames(frame) + m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame) + require.Equal(t, 1, m.currentChannel.PendingFrames()) returnedTxData, err := m.nextTxData(m.currentChannel) expectedTxData := singleFrameTxData(frame) @@ -291,7 +292,7 @@ func TestChannelTxFailed(t *testing.T) { frameNumber: uint16(0), }, } - m.currentChannel.channelBuilder.PushFrames(frame) + m.currentChannel.channelBuilder.frames = append(m.currentChannel.channelBuilder.frames, frame) require.Equal(t, 1, m.currentChannel.PendingFrames()) returnedTxData, err := m.nextTxData(m.currentChannel) expectedTxData := singleFrameTxData(frame) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 6e237b176d8d..2c9db3821e3a 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "math/big" _ "net/http/pprof" "sync" @@ -241,11 +240,12 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { // 2. Check if the sync status is valid or if we are all the way up to date // 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) // 4. Load all new blocks into the local state. +// 5. Dequeue blocks from local state which are now safe. // // If there is a reorg, it will reset the last stored block but not clear the internal state so // the state can be flushed to L1. -func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { - start, end, err := l.calculateL2BlockRangeToStore(ctx) +func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx context.Context) error { + start, end, err := l.calculateL2BlockRangeToStore(syncStatus) if err != nil { l.Log.Warn("Error calculating L2 block range", "err", err) return err @@ -308,12 +308,10 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin return block, nil } -// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. -// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) -func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { +func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, error) { rollupClient, err := l.EndpointProvider.RollupClient(ctx) if err != nil { - return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("getting rollup client: %w", err) + return nil, fmt.Errorf("getting rollup client: %w", err) } var ( @@ -331,7 +329,7 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. // Ensure that we have the sync status if err != nil { - return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("failed to get sync status: %w", err) + return nil, fmt.Errorf("failed to get sync status: %w", err) } // If we have a head, break out of the loop @@ -348,10 +346,21 @@ func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth. // Reset timer to tick of the new backoff time again timer.Reset(backoff) case <-ctx.Done(): - return eth.BlockID{}, eth.BlockID{}, ctx.Err() + return nil, ctx.Err() } } + return syncStatus, nil +} + +// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. +// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions +// as well as garbage collecting blocks which became safe) +func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) { + if syncStatus.HeadL1 == (eth.L1BlockRef{}) { + return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status") + } + // Check last stored to see if it needs to be set on startup OR set if is lagged behind. // It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive. if l.lastStoredBlock == (eth.BlockID{}) { @@ -430,65 +439,36 @@ func (l *BatchSubmitter) loop() { ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() - publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh, daGroup, time.Duration(math.MaxInt64)) - if !l.Txmgr.IsClosed() { - if l.Config.UseAltDA { - l.Log.Info("Waiting for altDA writes to complete...") - err := daGroup.Wait() - if err != nil { - l.Log.Error("Error returned by one of the altda goroutines waited on", "err", err) - } - } - l.Log.Info("Waiting for L1 txs to be confirmed...") - err := queue.Wait() - if err != nil { - l.Log.Error("Error returned by one of the txmgr goroutines waited on", "err", err) - } - } else { - l.Log.Info("Txmgr is closed, remaining channel data won't be sent") - } - } - for { select { case <-ticker.C: + if !l.checkTxpool(queue, receiptsCh) { continue } - if err := l.loadBlocksIntoState(l.shutdownCtx); errors.Is(err, ErrReorg) { - err := l.state.Close() + + syncStatus, err := l.getSyncStatus(l.shutdownCtx) + if err != nil { + l.Log.Warn("could not get sync status", "err", err) + continue + } + + l.state.pruneSafeBlocks(syncStatus.SafeL2) + l.state.pruneChannels(syncStatus.SafeL2) + if err := l.loadBlocksIntoState(*syncStatus, l.shutdownCtx); errors.Is(err, ErrReorg) { + // Wait for any in flight transactions + // to be ingested by the node before + // we start loading blocks again. + err := l.waitNodeSync() if err != nil { - if errors.Is(err, ErrPendingAfterClose) { - l.Log.Warn("Closed channel manager to handle L2 reorg with pending channel(s) remaining - submitting") - } else { - l.Log.Error("Error closing the channel manager to handle a L2 reorg", "err", err) - } + l.Log.Warn("error waiting for node sync", "err", err) } - // on reorg we want to publish all pending state then wait until each result clears before resetting - // the state. - publishAndWait() l.clearState(l.shutdownCtx) continue } l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) case <-l.shutdownCtx.Done(): - if l.Txmgr.IsClosed() { - l.Log.Info("Txmgr is closed, remaining channel data won't be sent") - return - } - // This removes any never-submitted pending channels, so these do not have to be drained with transactions. - // Any remaining unfinished channel is terminated, so its data gets submitted. - err := l.state.Close() - if err != nil { - if errors.Is(err, ErrPendingAfterClose) { - l.Log.Warn("Closed channel manager on shutdown with pending channel(s) remaining - submitting") - } else { - l.Log.Error("Error closing the channel manager on shutdown", "err", err) - } - } - publishAndWait() - l.Log.Info("Finished publishing all remaining channel data") + l.Log.Warn("main loop returning") return } } diff --git a/op-batcher/metrics/test.go b/op-batcher/metrics/test.go new file mode 100644 index 000000000000..76c365ea7e2b --- /dev/null +++ b/op-batcher/metrics/test.go @@ -0,0 +1,22 @@ +package metrics + +import ( + "github.com/ethereum/go-ethereum/core/types" +) + +type TestMetrics struct { + noopMetrics + PendingBlocksBytesCurrent float64 +} + +var _ Metricer = new(TestMetrics) + +func (m *TestMetrics) RecordL2BlockInPendingQueue(block *types.Block) { + _, rawSize := estimateBatchSize(block) + m.PendingBlocksBytesCurrent += float64(rawSize) + +} +func (m *TestMetrics) RecordL2BlockInChannel(block *types.Block) { + _, rawSize := estimateBatchSize(block) + m.PendingBlocksBytesCurrent -= float64(rawSize) +} diff --git a/op-batcher/readme.md b/op-batcher/readme.md new file mode 100644 index 000000000000..1d601fea94be --- /dev/null +++ b/op-batcher/readme.md @@ -0,0 +1,69 @@ +# op-batcher + +The `op-batcher` is responsible for ensuring data availability. See the [specs](https://specs.optimism.io/protocol/batcher.html). + + +## Interactions & Dependencies +The `op-batcher` works together with the [sequencer](../op-node/) (which it reads unsafe blocks from), the data availability layer (e.g. Layer 1 or an [Alt DA](../op-alt-da/) layer, which it posts data to), and the [derivation pipeline](../op-node/) (which reads the data from the DA layer and progresses the safe chain). + +It depends directly on some code shared with the derivation pipeline, namely the [`ChannelOut`](../op-node/rollup/derive/channel_out.go) implementation(s). It also depends directly on the shared [txmgr](../op-service/txmgr/) module. + +## Testing +The batcher has a suite of unit test which can be triggered by running +``` +go test ./... +``` +from this directory. There are also end-to-end tests in [`op-e2e`](../op-e2e/) which integrate the batcher. + +## Architecture + +The architecture of this batcher implementation is shown on the left side of the following diagram: + +![architecture](./architecture.png) + +Batch submitting (writing to the DA layer, in the middle of the diagram) works together with the derivation pipeline (on the right side of the diagram, reading from the DA layer) to progress the safe chain. + +The philosophy behind the current architecture is: +* Blocks, channels and frames are kept around for as long as they might be needed, and discarded as soon as they are not needed. They are not moved from one part of state to another. +* We retain block data in a strict order for as long as necessary. We only garbage collect frames, channels and blocks when the safe head moves sufficiently and those structures have done their job. +* When something goes wrong, we rewind the state cursors by the minimal amount we need to get going again. + + +### Happy path + +In the happy path, the batcher periodically: +1. Enqueues unsafe blocks and dequeues safe blocks from the sequencer to its internal state. +2. Enqueues a new channel, if necessary. +3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames. +4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions). +5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 1. + + +The `blockCursor` state variable tracks the next unprocessed block. +In each channel, the `frameCursor` tracks the next unsent frame. + + +### Reorgs +When an L2 unsafe reorg is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again. + +### Tx Failed +When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order. + +### Channel Times Out +When at Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer. + +## Design Principles and Optimization Targets +At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher. + +The batcher can almost always recover from unforeseen situations by being restarted. + + +Some complexity is permitted, however, for handling data availability switching, so that the batcher is not wasting money for longer periods of time. + +## Known issues and future work + +Link to [open issues with the `op-batcher` tag](https://github.com/ethereum-optimism/optimism/issues?q=is%3Aopen+is%3Aissue+label%3AA-op-batcher). + +The batcher launches L1 transactions in parallel so that it can achieve higher throughput, particularly in situations where there is a large backlog of data which needs to be posted. Sometimes, transactions can get stuck in the L1 mempool. The batcher does have functionality to clear these stuck transactions, but it is not completely reliable. + +The automatic data availability switching behavior is a somewhat new feature which may still have some bugs in it.