Skip to content

Commit

Permalink
batcher: keep blocks, channels and frames in strict order & simplify …
Browse files Browse the repository at this point in the history
…reorg handling (#12390)

* use a queue.Queue for channelBuilder.frames

* remove pop and push terminology

* proliferate queue.Queue type

* simplify requeue method

* undo changes to submodule

* sketch out new arch

https://www.notion.so/oplabs/op-batcher-re-architecture-114f153ee162803d943ff4628ab6578f

* add TODO

* add channelManager.pruneSafeBlocks method and integrate into main loop

* fix frameCursor semantics

* fixup tests

* avoid Rewind() in tests

* only rewind cursor in rewind (never move it forward)

* fix assertions

* prune channels whose blocks are now safe

* handle case when rewinding a channel with no blocks

this is strange, I don't think we should expect channels with frames but no blocks...

* add clarification

* implement channelManager.pendinBlocks() method

* fix pruning logic

* simplify pruneChannels

* simplify pruneSafeBlocks

* add unit tests for pruneSafeBlocks

* fix pruneSafeBlocks to avoid underflow

* improve test

* add unit tests for pruneChannels

* introduce handleChannelTimeout

and simplify channel.TxConfirmed API

* factor out channelManager.rewindToBlockWithHash

* change test expectation

* do more pruning in test

* Replace "clean shutdown" behaviour with waitNodeSync()

Instead of optimizing for a clean shutdown (which we couldn't guarantee anyway), this change optimizes for code simplicity.

This change also helps us restrict the amount of code which mutates the channelQueue (removePendingChannel was doing removal of channels at arbitrary positions in the queue).

The downside is that we may end up needlessly resubmitting some data after the reset.

Reorgs are rare, so it makes sense to optimize for correctness rather than DA costs here.

* Add readme and architecture diagram

* don't panic when there is a safe chain reorg

* fix test

* readability improvements

* only clear state after waiting for node to sync

* resize image

* tweak readme

* typo

* rewindToBlockWithHash never moves cursor forward

* use s.pendingBlocks()

* add log line

* check there are blocks when handling timeout

* rename HasFrame() to HasPendingFrame()

* fixup test

* improve readme

* link to open issues by tag

* add log when main loop returns

* pass blockID to rewindToBlock

and panic if block does not exist

* don't remove all channels when a channel times out

keep older channels, it's possible that they also time out

* use newSafeHead.L1Origin in Clear() when pruning blocks

* clarify comment

* use warn log level on safe chain reorg pruning, and unify handling for safe head above unsafe head

* update panic message

* extend test coverage and fix bug

* rename test blocks

* simplify HasPendingFrame() method

* simplify implementation of RewindFrameCursor

* activate dormant test

* ensure pending_blocks_bytes_current metric is tracked properly

* cover metrics behaviour in test

using new TestMetrics struct

* extend test coverage to channelManager.handleChannelTimeout

* add comment to TxFailed

* rename test fn

* point to e2e tests in readme.

* readme: performance -> throughput

* improve channel_manager_test to assert old channels are not affected by requeue or timeout

* fix handleChannelTimeout behaviour

We were trimming older channels and keeping new ones. We need to trim newer channels and keep old ones. Fixes associated test (see previous commit).

* tighten up requirements for invalidating a channel

* replace requeue with handleChannelInvalidated
  • Loading branch information
geoknee authored Nov 18, 2024
1 parent 873b3e0 commit c91fe2f
Show file tree
Hide file tree
Showing 10 changed files with 503 additions and 482 deletions.
Binary file added op-batcher/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
37 changes: 17 additions & 20 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
func (c *channel) TxFailed(id string) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
// this needs to be changed to iterate over all frames of the tx data
// and re-queue them.
c.channelBuilder.PushFrames(data.Frames()...)
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
delete(c.pendingTransactions, id)
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
Expand All @@ -61,18 +61,16 @@ func (c *channel) TxFailed(id string) {
c.metr.RecordBatchTxFailed()
}

// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
// a channel have been marked as confirmed on L1 the channel may be invalid & need to be
// resubmitted.
// This function may reset the pending channel if the pending channel has timed out.
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block) {
// TxConfirmed marks a transaction as confirmed on L1. Returns a bool indicating
// whether the channel timed out on chain.
func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
c.metr.RecordBatchTxSubmitted()
c.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock)
if _, ok := c.pendingTransactions[id]; !ok {
c.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock)
// TODO: This can occur if we clear the channel while there are still pending transactions
// We need to keep track of stale transactions instead
return false, nil
return false
}
delete(c.pendingTransactions, id)
c.confirmedTransactions[id] = inclusionBlock
Expand All @@ -82,21 +80,20 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
c.minInclusionBlock = min(c.minInclusionBlock, inclusionBlock.Number)
c.maxInclusionBlock = max(c.maxInclusionBlock, inclusionBlock.Number)

if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
}

// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if c.isTimedOut() {
c.metr.RecordChannelTimedOut(c.ID())
c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, c.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if c.isFullySubmitted() {
c.metr.RecordChannelFullySubmitted(c.ID())
c.log.Info("Channel is fully submitted", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
return true, nil
return true
}

return false, nil
return false
}

// Timeout returns the channel timeout L1 block number. If there is no timeout set, it returns 0.
Expand Down Expand Up @@ -136,7 +133,7 @@ func (c *channel) ID() derive.ChannelID {
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
for i := 0; i < nf && c.channelBuilder.HasFrame(); i++ {
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}
Expand All @@ -151,7 +148,7 @@ func (c *channel) NextTxData() txData {
func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasFrame()
return c.channelBuilder.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
return c.channelBuilder.PendingFrames() >= int(c.cfg.MaxFramesPerTx())
Expand Down
61 changes: 34 additions & 27 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/core/types"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ type ChannelBuilder struct {
// current channel
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
blocks queue.Queue[*types.Block]
// latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel
latestL1Origin eth.BlockID
// oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel
Expand All @@ -75,7 +76,12 @@ type ChannelBuilder struct {
// oldestL2 is the oldest L2 block of all the L2 blocks that have been added to the channel
oldestL2 eth.BlockID
// frames data queue, to be send as txs
frames []frameData
frames queue.Queue[frameData]
// frameCursor tracks which frames in the queue were submitted
// frames[frameCursor] is the next unsubmitted (pending) frame
// frameCursor = len(frames) is reserved for when
// there are no pending (next unsubmitted) frames
frameCursor int
// total frames counter
numFrames int
// total amount of output data of all frames created yet
Expand Down Expand Up @@ -190,7 +196,7 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return l1info, fmt.Errorf("adding block to channel out: %w", err)
}

c.blocks = append(c.blocks, block)
c.blocks.Enqueue(block)
c.updateSwTimeout(l1info.Number)

if l1info.Number > c.latestL1Origin.Number {
Expand Down Expand Up @@ -312,11 +318,11 @@ func (c *ChannelBuilder) setFullErr(err error) {
}

// OutputFrames creates new frames with the channel out. It should be called
// after AddBlock and before iterating over available frames with HasFrame and
// after AddBlock and before iterating over pending frames with HasFrame and
// NextFrame.
//
// If the channel isn't full yet, it will conservatively only
// pull readily available frames from the compression output.
// pull pending frames from the compression output.
// If it is full, the channel is closed and all remaining
// frames will be created, possibly with a small leftover frame.
func (c *ChannelBuilder) OutputFrames() error {
Expand Down Expand Up @@ -387,7 +393,7 @@ func (c *ChannelBuilder) outputFrame() error {
id: frameID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
c.frames.Enqueue(frame)
c.numFrames++
c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame)
Expand All @@ -402,46 +408,47 @@ func (c *ChannelBuilder) Close() {
}

// TotalFrames returns the total number of frames that were created in this channel so far.
// It does not decrease when the frames queue is being emptied.
func (c *ChannelBuilder) TotalFrames() int {
return c.numFrames
}

// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
// HasPendingFrame returns whether there's any pending frame. If true, it can be
// dequeued using NextFrame().
//
// Call OutputFrames before to create new frames from the channel out
// compression pipeline.
func (c *ChannelBuilder) HasFrame() bool {
return len(c.frames) > 0
func (c *ChannelBuilder) HasPendingFrame() bool {
return c.frameCursor < c.frames.Len()
}

// PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrame() returns true.
// It is larger than zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames)
return c.frames.Len() - c.frameCursor
}

// NextFrame dequeues the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// NextFrame returns the next pending frame and increments the frameCursor
// HasFrame must be called prior to check if there a next pending frame exists.
// Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData {
if len(c.frames) == 0 {
if len(c.frames) <= c.frameCursor {
panic("no next frame")
}

f := c.frames[0]
c.frames = c.frames[1:]
f := c.frames[c.frameCursor]
c.frameCursor++
return f
}

// PushFrames adds the frames back to the internal frames queue. Panics if not of
// the same channel.
func (c *ChannelBuilder) PushFrames(frames ...frameData) {
for _, f := range frames {
if f.id.chID != c.ID() {
panic("wrong channel")
}
c.frames = append(c.frames, f)
// RewindFrameCursor moves the frameCursor to point at the supplied frame
// only if it is ahead of it.
// Panics if the frame is not in this channel.
func (c *ChannelBuilder) RewindFrameCursor(frame frameData) {
if c.frames.Len() <= int(frame.id.frameNumber) ||
len(c.frames[frame.id.frameNumber].data) != len(frame.data) ||
c.frames[frame.id.frameNumber].id.chID != frame.id.chID {
panic("cannot rewind to unknown frame")
}
if c.frameCursor > int(frame.id.frameNumber) {
c.frameCursor = int(frame.id.frameNumber)
}
}
21 changes: 11 additions & 10 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestChannelBuilderBatchType(t *testing.T) {
{"ChannelBuilder_PendingFrames_TotalFrames", ChannelBuilder_PendingFrames_TotalFrames},
{"ChannelBuilder_InputBytes", ChannelBuilder_InputBytes},
{"ChannelBuilder_OutputBytes", ChannelBuilder_OutputBytes},
{"ChannelBuilder_OutputWrongFramePanic", ChannelBuilder_OutputWrongFramePanic},
}
for _, test := range tests {
test := test
Expand Down Expand Up @@ -340,7 +341,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
},
data: expectedBytes,
}
cb.PushFrames(frameData)
cb.frames = append(cb.frames, frameData)

// There should only be 1 frame in the channel builder
require.Equal(t, 1, cb.PendingFrames())
Expand All @@ -355,7 +356,7 @@ func TestChannelBuilder_NextFrame(t *testing.T) {
require.PanicsWithValue(t, "no next frame", func() { cb.NextFrame() })
}

// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when a frame is pushed with an invalid frame id
// TestChannelBuilder_OutputWrongFramePanic tests that a panic is thrown when we try to rewind the cursor with an invalid frame id
func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {
channelConfig := defaultTestChannelConfig()
channelConfig.BatchType = batchType
Expand All @@ -377,15 +378,15 @@ func ChannelBuilder_OutputWrongFramePanic(t *testing.T, batchType uint) {

// The frame push should panic since we constructed a new channel out
// so the channel out id won't match
require.PanicsWithValue(t, "wrong channel", func() {
require.PanicsWithValue(t, "cannot rewind to unknown frame", func() {
frame := frameData{
id: frameID{
chID: co.ID(),
frameNumber: fn,
},
data: buf.Bytes(),
}
cb.PushFrames(frame)
cb.RewindFrameCursor(frame)
})
}

Expand Down Expand Up @@ -625,11 +626,11 @@ func TestChannelBuilder_FullShadowCompressor(t *testing.T) {

require.NoError(cb.OutputFrames())

require.True(cb.HasFrame())
require.True(cb.HasPendingFrame())
f := cb.NextFrame()
require.Less(len(f.data), int(cfg.MaxFrameSize)) // would fail without fix, full frame

require.False(cb.HasFrame(), "no leftover frame expected") // would fail without fix
require.False(cb.HasPendingFrame(), "no leftover frame expected") // would fail without fix
}

func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
Expand All @@ -656,8 +657,8 @@ func ChannelBuilder_AddBlock(t *testing.T, batchType uint) {
expectedInputBytes = 47
}
require.Equal(t, expectedInputBytes, cb.co.InputBytes())
require.Equal(t, 1, len(cb.blocks))
require.Equal(t, 0, len(cb.frames))
require.Equal(t, 1, cb.blocks.Len())
require.Equal(t, 0, cb.frames.Len())
require.True(t, cb.IsFull())

// Since the channel output is full, the next call to AddBlock
Expand Down Expand Up @@ -858,7 +859,7 @@ func ChannelBuilder_PendingFrames_TotalFrames(t *testing.T, batchType uint) {

// empty queue
for pf := nf - 1; pf >= 0; pf-- {
require.True(cb.HasFrame())
require.True(cb.HasPendingFrame())
_ = cb.NextFrame()
require.Equal(cb.PendingFrames(), pf)
require.Equal(cb.TotalFrames(), nf)
Expand Down Expand Up @@ -932,7 +933,7 @@ func ChannelBuilder_OutputBytes(t *testing.T, batchType uint) {
require.Greater(cb.PendingFrames(), 1)

var flen int
for cb.HasFrame() {
for cb.HasPendingFrame() {
f := cb.NextFrame()
flen += len(f.data)
}
Expand Down
Loading

0 comments on commit c91fe2f

Please sign in to comment.