Skip to content

Commit

Permalink
op-batcher: Move decision about data availability type to channel sub…
Browse files Browse the repository at this point in the history
…mission time (#12002)

* tidy up godoc

* move data availability config decision to channel submission time

instead of channel creation time

Also, cache the ChannelConfig whenever we switch DA type so it is used by default for new channels

* fix test

* formatting changes

* respond to PR comments

* add unit test for Requeue method

* reduce number of txs in test block

* improve test (more blocks in queue)

* hoist pending tx management up

* wip

* tidy up test

* wip

* fix

* refactor to do requeue before calling nextTxData

* introduce ErrInsufficientData

do not return nextTxData from channel which was discarded by requeue

* run test until nonzero data is returned by TxData

* break up and improve error logic

* fix test to anticipate ErrInsufficientData

* after requeuing, call nextTxData again

* remove unecessary checks

* move err declaration to top of file

* add some comments and whitespace

* hoist lock back up to TxData

* rename variable to blocksToRequeue

* remove panic

* add comment

* use deterministic rng and nonecompressor in test

* test: increase block size to fill channel more quickly

* remove ErrInsufficientData

replace with io.EOF as before

* tidy up

* typo
  • Loading branch information
geoknee authored Sep 25, 2024
1 parent 4f1e8a7 commit 106993f
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 29 deletions.
11 changes: 6 additions & 5 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
}

// NextTxData returns the next tx data packet.
// If cfg.MultiFrameTxs is false, it returns txData with a single frame.
// If cfg.MultiFrameTxs is true, it will read frames from its channel builder
// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
Expand All @@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData {
}

func (s *channel) HasTxData() bool {
if s.IsFull() || !s.cfg.UseBlobs {
if s.IsFull() || // If the channel is full, we should start to submit it
!s.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return s.channelBuilder.HasFrame()
}
// collect enough frames if channel is not full yet
// Collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,12 @@ func (c *ChannelBuilder) HasFrame() bool {
}

// PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrames() returns true.
// It is larger zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames)
}

// NextFrame returns the next available frame.
// NextFrame dequeues the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData {
Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type ChannelConfig struct {
UseBlobs bool
}

// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static
// ChannelConfigProvider of itself.
// ChannelConfig returns a copy of the receiver.
// This allows the receiver to be a static ChannelConfigProvider of itself.
func (cc ChannelConfig) ChannelConfig() ChannelConfig {
return cc
}
Expand Down
4 changes: 4 additions & 0 deletions op-batcher/batcher/channel_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger,
return dec
}

// ChannelConfig will perform an estimate of the cost per byte for
// calldata and for blobs, given current market conditions: it will return
// the appropriate ChannelConfig depending on which is cheaper. It makes
// assumptions about the typical makeup of channel data.
func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
ctx, cancel := context.WithTimeout(context.Background(), dec.timeout)
defer cancel()
Expand Down
103 changes: 89 additions & 14 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type channelManager struct {
blocks []*types.Block
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
tip common.Hash

Expand All @@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
log: log,
metr: metr,
cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
}
Expand Down Expand Up @@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) {
s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...)
}

// nextTxData pops off s.datas & handles updating the internal state
// nextTxData dequeues frames from the channel and returns them encoded in a transaction.
// It also updates the internal tx -> channels mapping
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
Expand All @@ -146,12 +150,51 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {

// TxData returns the next tx data that should be submitted to L1.
//
// If the pending channel is
// If the current channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
//
// It will decide whether to switch DA type automatically.
// When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
channel, err := s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
}
// If the channel has already started being submitted,
// return now and ensure no requeueing happens
if !channel.NoneSubmitted() {
return s.nextTxData(channel)
}

// Call provider method to reassess optimal DA type
newCfg := s.cfgProvider.ChannelConfig()

// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
return s.nextTxData(channel)
}

// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
s.Requeue(newCfg)
channel, err = s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
}
return s.nextTxData(channel)
}

// getReadyChannel returns the next channel ready to submit data, or an error.
// It adds blocks from the block queue to the current channel and generates frames for it.
func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
var firstWithTxData *channel
for _, ch := range s.channelQueue {
if ch.HasTxData() {
Expand All @@ -160,27 +203,31 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
}
}

dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

// Short circuit if there is pending tx data or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData(firstWithTxData)
// Short circuit if there is pending tx data or the channel manager is closed
if dataPending {
return firstWithTxData, nil
}

if s.closed {
return nil, io.EOF
}

// No pending tx data, so we have to add new blocks to the channel

// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
return txData{}, io.EOF
return nil, io.EOF
}

if err := s.ensureChannelWithSpace(l1Head); err != nil {
return txData{}, err
return nil, err
}

if err := s.processBlocks(); err != nil {
return txData{}, err
return nil, err
}

// Register current L1 head only after all pending blocks have been
Expand All @@ -189,10 +236,10 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
s.registerL1Block(l1Head)

if err := s.outputFrames(); err != nil {
return txData{}, err
return nil, err
}

return s.nextTxData(s.currentChannel)
return s.currentChannel, nil
}

// ensureChannelWithSpace ensures currentChannel is populated with a channel that has
Expand All @@ -203,7 +250,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

cfg := s.cfgProvider.ChannelConfig()
// We reuse the ChannelConfig from the last channel.
// This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
Expand All @@ -228,7 +278,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

// registerL1Block registers the given block at the pending channel.
// registerL1Block registers the given block at the current channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number)
s.log.Debug("new L1-block registered at channel builder",
Expand All @@ -238,7 +288,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
)
}

// processBlocks adds blocks from the blocks queue to the pending channel until
// processBlocks adds blocks from the blocks queue to the current channel until
// either the queue got exhausted or the channel is full.
func (s *channelManager) processBlocks() error {
var (
Expand Down Expand Up @@ -288,6 +338,7 @@ func (s *channelManager) processBlocks() error {
return nil
}

// outputFrames generates frames for the current channel, and computes and logs the compression ratio
func (s *channelManager) outputFrames() error {
if err := s.currentChannel.OutputFrames(); err != nil {
return fmt.Errorf("creating frames with channel builder: %w", err)
Expand Down Expand Up @@ -339,6 +390,7 @@ func (s *channelManager) outputFrames() error {
func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}
Expand Down Expand Up @@ -414,3 +466,26 @@ func (s *channelManager) Close() error {
}
return nil
}

// Requeue rebuilds the channel manager state by
// rewinding blocks back from the channel queue, and setting the defaultCfg.
func (s *channelManager) Requeue(newCfg ChannelConfig) {
newChannelQueue := []*channel{}
blocksToRequeue := []*types.Block{}
for _, channel := range s.channelQueue {
if !channel.NoneSubmitted() {
newChannelQueue = append(newChannelQueue, channel)
continue
}
blocksToRequeue = append(blocksToRequeue, channel.channelBuilder.Blocks()...)
}

// We put the blocks back at the front of the queue:
s.blocks = append(blocksToRequeue, s.blocks...)
// Channels which where already being submitted are put back
s.channelQueue = newChannelQueue
s.currentChannel = nil
// Setting the defaultCfg will cause new channels
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
}
Loading

0 comments on commit 106993f

Please sign in to comment.