diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index e35124d8525a..de68fa588a0a 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID { return s.channelBuilder.ID() } -// NextTxData returns the next tx data packet. -// If cfg.MultiFrameTxs is false, it returns txData with a single frame. -// If cfg.MultiFrameTxs is true, it will read frames from its channel builder +// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet. +// If cfg.UseBlobs is false, it returns txData with a single frame. +// If cfg.UseBlobs is true, it will read frames from its channel builder // until it either doesn't have more frames or the target number of frames is reached. // // NextTxData should only be called after HasTxData returned true. @@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData { } func (s *channel) HasTxData() bool { - if s.IsFull() || !s.cfg.UseBlobs { + if s.IsFull() || // If the channel is full, we should start to submit it + !s.cfg.UseBlobs { // If using calldata, we only send one frame per tx return s.channelBuilder.HasFrame() } - // collect enough frames if channel is not full yet + // Collect enough frames if channel is not full yet return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx()) } diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index cb4345e419d4..0c16f3156d9b 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -417,12 +417,12 @@ func (c *ChannelBuilder) HasFrame() bool { } // PendingFrames returns the number of pending frames in the frames queue. -// It is larger zero iff HasFrames() returns true. +// It is larger zero iff HasFrame() returns true. func (c *ChannelBuilder) PendingFrames() int { return len(c.frames) } -// NextFrame returns the next available frame. +// NextFrame dequeues the next available frame. // HasFrame must be called prior to check if there's a next frame available. // Panics if called when there's no next frame. func (c *ChannelBuilder) NextFrame() frameData { diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index 63e0d5d5deef..45dc1d4dcfa4 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -51,8 +51,8 @@ type ChannelConfig struct { UseBlobs bool } -// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static -// ChannelConfigProvider of itself. +// ChannelConfig returns a copy of the receiver. +// This allows the receiver to be a static ChannelConfigProvider of itself. func (cc ChannelConfig) ChannelConfig() ChannelConfig { return cc } diff --git a/op-batcher/batcher/channel_config_provider.go b/op-batcher/batcher/channel_config_provider.go index c65e83b8289f..6cf5b0db6863 100644 --- a/op-batcher/batcher/channel_config_provider.go +++ b/op-batcher/batcher/channel_config_provider.go @@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger, return dec } +// ChannelConfig will perform an estimate of the cost per byte for +// calldata and for blobs, given current market conditions: it will return +// the appropriate ChannelConfig depending on which is cheaper. It makes +// assumptions about the typical makeup of channel data. func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig { ctx, cancel := context.WithTimeout(context.Background(), dec.timeout) defer cancel() diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1f22565c94c5..3bfff303db4b 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -35,6 +35,8 @@ type channelManager struct { blocks []*types.Block // The latest L1 block from all the L2 blocks in the most recently closed channel l1OriginLastClosedChannel eth.BlockID + // The default ChannelConfig to use for the next channel + defaultCfg ChannelConfig // last block hash - for reorg detection tip common.Hash @@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe log: log, metr: metr, cfgProvider: cfgProvider, + defaultCfg: cfgProvider.ChannelConfig(), rollupCfg: rollupCfg, txChannels: make(map[string]*channel), } @@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) { s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...) } -// nextTxData pops off s.datas & handles updating the internal state +// nextTxData dequeues frames from the channel and returns them encoded in a transaction. +// It also updates the internal tx -> channels mapping func (s *channelManager) nextTxData(channel *channel) (txData, error) { if channel == nil || !channel.HasTxData() { s.log.Trace("no next tx data") @@ -146,12 +150,51 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { // TxData returns the next tx data that should be submitted to L1. // -// If the pending channel is +// If the current channel is // full, it only returns the remaining frames of this channel until it got // successfully fully sent to L1. It returns io.EOF if there's no pending tx data. +// +// It will decide whether to switch DA type automatically. +// When switching DA type, the channelManager state will be rebuilt +// with a new ChannelConfig. func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { s.mu.Lock() defer s.mu.Unlock() + channel, err := s.getReadyChannel(l1Head) + if err != nil { + return emptyTxData, err + } + // If the channel has already started being submitted, + // return now and ensure no requeueing happens + if !channel.NoneSubmitted() { + return s.nextTxData(channel) + } + + // Call provider method to reassess optimal DA type + newCfg := s.cfgProvider.ChannelConfig() + + // No change: + if newCfg.UseBlobs == s.defaultCfg.UseBlobs { + s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type", + "useBlobs", s.defaultCfg.UseBlobs) + return s.nextTxData(channel) + } + + // Change: + s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...", + "useBlobsBefore", s.defaultCfg.UseBlobs, + "useBlobsAfter", newCfg.UseBlobs) + s.Requeue(newCfg) + channel, err = s.getReadyChannel(l1Head) + if err != nil { + return emptyTxData, err + } + return s.nextTxData(channel) +} + +// getReadyChannel returns the next channel ready to submit data, or an error. +// It adds blocks from the block queue to the current channel and generates frames for it. +func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) { var firstWithTxData *channel for _, ch := range s.channelQueue { if ch.HasTxData() { @@ -160,27 +203,31 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { } } - dataPending := firstWithTxData != nil && firstWithTxData.HasTxData() + dataPending := firstWithTxData != nil s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) - // Short circuit if there is pending tx data or the channel manager is closed. - if dataPending || s.closed { - return s.nextTxData(firstWithTxData) + // Short circuit if there is pending tx data or the channel manager is closed + if dataPending { + return firstWithTxData, nil + } + + if s.closed { + return nil, io.EOF } // No pending tx data, so we have to add new blocks to the channel // If we have no saved blocks, we will not be able to create valid frames if len(s.blocks) == 0 { - return txData{}, io.EOF + return nil, io.EOF } if err := s.ensureChannelWithSpace(l1Head); err != nil { - return txData{}, err + return nil, err } if err := s.processBlocks(); err != nil { - return txData{}, err + return nil, err } // Register current L1 head only after all pending blocks have been @@ -189,10 +236,10 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { s.registerL1Block(l1Head) if err := s.outputFrames(); err != nil { - return txData{}, err + return nil, err } - return s.nextTxData(s.currentChannel) + return s.currentChannel, nil } // ensureChannelWithSpace ensures currentChannel is populated with a channel that has @@ -203,7 +250,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } - cfg := s.cfgProvider.ChannelConfig() + // We reuse the ChannelConfig from the last channel. + // This will be reassessed at channel submission-time, + // but this is our best guess at the appropriate values for now. + cfg := s.defaultCfg pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number) if err != nil { return fmt.Errorf("creating new channel: %w", err) @@ -228,7 +278,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } -// registerL1Block registers the given block at the pending channel. +// registerL1Block registers the given block at the current channel. func (s *channelManager) registerL1Block(l1Head eth.BlockID) { s.currentChannel.CheckTimeout(l1Head.Number) s.log.Debug("new L1-block registered at channel builder", @@ -238,7 +288,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) { ) } -// processBlocks adds blocks from the blocks queue to the pending channel until +// processBlocks adds blocks from the blocks queue to the current channel until // either the queue got exhausted or the channel is full. func (s *channelManager) processBlocks() error { var ( @@ -288,6 +338,7 @@ func (s *channelManager) processBlocks() error { return nil } +// outputFrames generates frames for the current channel, and computes and logs the compression ratio func (s *channelManager) outputFrames() error { if err := s.currentChannel.OutputFrames(); err != nil { return fmt.Errorf("creating frames with channel builder: %w", err) @@ -339,6 +390,7 @@ func (s *channelManager) outputFrames() error { func (s *channelManager) AddL2Block(block *types.Block) error { s.mu.Lock() defer s.mu.Unlock() + if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { return ErrReorg } @@ -414,3 +466,26 @@ func (s *channelManager) Close() error { } return nil } + +// Requeue rebuilds the channel manager state by +// rewinding blocks back from the channel queue, and setting the defaultCfg. +func (s *channelManager) Requeue(newCfg ChannelConfig) { + newChannelQueue := []*channel{} + blocksToRequeue := []*types.Block{} + for _, channel := range s.channelQueue { + if !channel.NoneSubmitted() { + newChannelQueue = append(newChannelQueue, channel) + continue + } + blocksToRequeue = append(blocksToRequeue, channel.channelBuilder.Blocks()...) + } + + // We put the blocks back at the front of the queue: + s.blocks = append(blocksToRequeue, s.blocks...) + // Channels which where already being submitted are put back + s.channelQueue = newChannelQueue + s.currentChannel = nil + // Setting the defaultCfg will cause new channels + // to pick up the new ChannelConfig + s.defaultCfg = newCfg +} diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index a6271df9a535..5df5feacf4bf 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -1,6 +1,7 @@ package batcher import ( + "errors" "io" "math/big" "math/rand" @@ -483,3 +484,174 @@ func TestChannelManager_ChannelCreation(t *testing.T) { }) } } + +// FakeDynamicEthChannelConfig is a ChannelConfigProvider which always returns +// either a blob- or calldata-based config depending on its internal chooseBlob +// switch. +type FakeDynamicEthChannelConfig struct { + DynamicEthChannelConfig + chooseBlobs bool +} + +func (f *FakeDynamicEthChannelConfig) ChannelConfig() ChannelConfig { + if f.chooseBlobs { + return f.blobConfig + } + return f.calldataConfig +} + +func newFakeDynamicEthChannelConfig(lgr log.Logger, + reqTimeout time.Duration) *FakeDynamicEthChannelConfig { + + calldataCfg := ChannelConfig{ + MaxFrameSize: 120_000 - 1, + TargetNumFrames: 1, + } + blobCfg := ChannelConfig{ + MaxFrameSize: eth.MaxBlobDataSize - 1, + TargetNumFrames: 3, // gets closest to amortized fixed tx costs + UseBlobs: true, + } + calldataCfg.InitNoneCompressor() + blobCfg.InitNoneCompressor() + + return &FakeDynamicEthChannelConfig{ + chooseBlobs: false, + DynamicEthChannelConfig: *NewDynamicEthChannelConfig( + lgr, + reqTimeout, + &mockGasPricer{}, + blobCfg, + calldataCfg), + } +} + +// TestChannelManager_TxData seeds the channel manager with blocks and triggers the +// blocks->channels pipeline multiple times. Values are chosen such that a channel +// is created under one set of market conditions, and then submitted under a different +// set of market conditions. The test asserts that the DA type is changed at channel +// submission time. +func TestChannelManager_TxData(t *testing.T) { + + type TestCase struct { + name string + chooseBlobsWhenChannelCreated bool + chooseBlobsWhenChannelSubmitted bool + } + + tt := []TestCase{ + {"blobs->blobs", true, true}, + {"calldata->calldata", false, false}, + {"blobs->calldata", true, false}, + {"calldata->blobs", false, true}, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + + cfg := newFakeDynamicEthChannelConfig(l, 1000) + + cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs) + + // Seed channel manager with a block + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + blockA := derivetest.RandomL2BlockWithChainId(rng, 200, defaultTestRollupConfig.L2ChainID) + m.blocks = []*types.Block{blockA} + + // Call TxData a first time to trigger blocks->channels pipeline + _, err := m.TxData(eth.BlockID{}) + require.ErrorIs(t, err, io.EOF) + + // The test requires us to have something in the channel queue + // at this point, but not yet ready to send and not full + require.NotEmpty(t, m.channelQueue) + require.False(t, m.channelQueue[0].IsFull()) + + // Simulate updated market conditions + // by possibly flipping the state of the + // fake channel provider + l.Info("updating market conditions", "chooseBlobs", tc.chooseBlobsWhenChannelSubmitted) + cfg.chooseBlobs = tc.chooseBlobsWhenChannelSubmitted + + // Add a block and call TxData until + // we get some data to submit + var data txData + for { + m.blocks = []*types.Block{blockA} + data, err = m.TxData(eth.BlockID{}) + if err == nil && data.Len() > 0 { + break + } + if !errors.Is(err, io.EOF) { + require.NoError(t, err) + } + } + + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs) + }) + } + +} + +// TestChannelManager_Requeue seeds the channel manager with blocks, +// takes a state snapshot, triggers the blocks->channels pipeline, +// and then calls Requeue. Finally, it asserts the channel manager's +// state is equal to the snapshot. It repeats this for a channel +// which has a pending transaction and verifies that Requeue is then +// a noop. +func TestChannelManager_Requeue(t *testing.T) { + l := testlog.Logger(t, log.LevelCrit) + cfg := channelManagerTestConfig(100, derive.SingularBatchType) + m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) + + // Seed channel manager with blocks + rng := rand.New(rand.NewSource(99)) + blockA := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID) + blockB := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID) + + // This is the snapshot of channel manager state we want to reinstate + // when we requeue + stateSnapshot := []*types.Block{blockA, blockB} + m.blocks = stateSnapshot + require.Empty(t, m.channelQueue) + + // Trigger the blocks -> channelQueue data pipelining + require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) + require.NotEmpty(t, m.channelQueue) + require.NoError(t, m.processBlocks()) + + // Assert that at least one block was processed into the channel + require.NotContains(t, m.blocks, blockA) + + // Call the function we are testing + m.Requeue(m.defaultCfg) + + // Ensure we got back to the state above + require.Equal(t, m.blocks, stateSnapshot) + require.Empty(t, m.channelQueue) + + // Trigger the blocks -> channelQueue data pipelining again + require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) + require.NotEmpty(t, m.channelQueue) + require.NoError(t, m.processBlocks()) + + // Assert that at least one block was processed into the channel + require.NotContains(t, m.blocks, blockA) + + // Now mark the 0th channel in the queue as already + // starting to send on chain + channel0 := m.channelQueue[0] + channel0.pendingTransactions["foo"] = txData{} + require.False(t, channel0.NoneSubmitted()) + + // Call the function we are testing + m.Requeue(m.defaultCfg) + + // The requeue shouldn't affect the pending channel + require.Contains(t, m.channelQueue, channel0) + require.NotContains(t, m.blocks, blockA) +} diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index 7fa8030e771e..8dec9d9e108b 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -86,8 +86,8 @@ func TestChannelManager_NextTxData(t *testing.T) { require.Equal(t, txData{}, returnedTxData) // Set the pending channel - // The nextTxData function should still return EOF - // since the pending channel has no frames + // The nextTxData function should still return io.EOF + // since the current channel has no frames require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{})) channel := m.currentChannel require.NotNil(t, channel) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 0b7d36d960dd..968e6de3e71a 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -190,10 +190,11 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { // loadBlocksIntoState loads all blocks since the previous stored block // It does the following: -// 1. Fetch the sync status of the sequencer -// 2. Check if the sync status is valid or if we are all the way up to date -// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) -// 4. Load all new blocks into the local state. +// 1. Fetch the sync status of the sequencer +// 2. Check if the sync status is valid or if we are all the way up to date +// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) +// 4. Load all new blocks into the local state. +// // If there is a reorg, it will reset the last stored block but not clear the internal state so // the state can be flushed to L1. func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { diff --git a/op-batcher/batcher/driver_test.go b/op-batcher/batcher/driver_test.go index 1c5ace753771..5ce0983bfe1a 100644 --- a/op-batcher/batcher/driver_test.go +++ b/op-batcher/batcher/driver_test.go @@ -50,6 +50,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) { Log: testlog.Logger(t, log.LevelDebug), Metr: metrics.NoopMetrics, RollupConfig: cfg, + ChannelConfig: defaultTestChannelConfig(), EndpointProvider: ep, }), ep }