Skip to content

Commit

Permalink
[batcher] derive.ChannelOut factory (ethereum-optimism#12344)
Browse files Browse the repository at this point in the history
* Add support for a derive.ChannelOut factory

* Add DriverSetupOption for injecting custom options into the DriverSetup

* Remove factory from NewChannelManager and NewChannelBuilder

* Add ChannelOut factory test

* Add comment about why we use a wrapper
  • Loading branch information
mdehoog authored and samlaf committed Nov 10, 2024
1 parent 8cd254e commit dd0c711
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 34 deletions.
11 changes: 3 additions & 8 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package batcher

import (
"fmt"
"math"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
Expand Down Expand Up @@ -34,20 +33,16 @@ type channel struct {
maxInclusionBlock uint64
}

func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) {
cb, err := NewChannelBuilder(cfg, rollupCfg, latestL1OriginBlockNum)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}

func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *channel {
cb := NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, channelOut)
return &channel{
log: log,
metr: metr,
cfg: cfg,
channelBuilder: cb,
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
}, nil
}
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
Expand Down
14 changes: 9 additions & 5 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,28 @@ type ChannelBuilder struct {
// channel out could not be created.
// it acts as a factory for either a span or singular channel out
func NewChannelBuilder(cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error) {
co, err := newChannelOut(cfg, rollupCfg)
co, err := NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, fmt.Errorf("creating channel out: %w", err)
}

return NewChannelBuilderWithChannelOut(cfg, rollupCfg, latestL1OriginBlockNum, co), nil
}

func NewChannelBuilderWithChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64, channelOut derive.ChannelOut) *ChannelBuilder {
cb := &ChannelBuilder{
cfg: cfg,
rollupCfg: rollupCfg,
co: co,
co: channelOut,
}

cb.updateDurationTimeout(latestL1OriginBlockNum)

return cb, nil
return cb
}

// newChannelOut creates a new channel out based on the given configuration.
func newChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
// NewChannelOut creates a new channel out based on the given configuration.
func NewChannelOut(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
spec := rollup.NewChainSpec(rollupCfg)
if cfg.BatchType == derive.SpanBatchType {
return derive.NewSpanChannelOut(
Expand Down
16 changes: 14 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

var ErrReorg = errors.New("block does not extend existing chain")

type ChannelOutFactory func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error)

// channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling.
//
Expand All @@ -32,6 +34,8 @@ type channelManager struct {
cfgProvider ChannelConfigProvider
rollupCfg *rollup.Config

outFactory ChannelOutFactory

// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
Expand Down Expand Up @@ -59,10 +63,15 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg,
outFactory: NewChannelOut,
txChannels: make(map[string]*channel),
}
}

func (s *channelManager) SetChannelOutFactory(outFactory ChannelOutFactory) {
s.outFactory = outFactory
}

// Clear clears the entire state of the channel manager.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
Expand Down Expand Up @@ -265,11 +274,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
// This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)

channelOut, err := s.outFactory(cfg, s.rollupCfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
return fmt.Errorf("creating channel out: %w", err)
}

pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number, channelOut)

s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)

Expand Down
22 changes: 22 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,25 @@ func TestChannelManager_Requeue(t *testing.T) {

require.NotContains(t, m.blocks, blockA)
}
func TestChannelManager_ChannelOutFactory(t *testing.T) {
type ChannelOutWrapper struct {
derive.ChannelOut
}

l := testlog.Logger(t, log.LevelCrit)
cfg := channelManagerTestConfig(100, derive.SingularBatchType)
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
m.SetChannelOutFactory(func(cfg ChannelConfig, rollupCfg *rollup.Config) (derive.ChannelOut, error) {
co, err := NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, err
}
// return a wrapper type, to validate that the factory was correctly used by checking the type below
return &ChannelOutWrapper{
ChannelOut: co,
}, nil
})
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))

require.IsType(t, &ChannelOutWrapper{}, m.currentChannel.channelBuilder.co)
}
13 changes: 11 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"fmt"
"io"
"testing"

Expand All @@ -23,6 +24,14 @@ func zeroFrameTxID(fn uint16) txID {
return txID{frameID{frameNumber: fn}}
}

func newChannelWithChannelOut(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) {
channelOut, err := NewChannelOut(cfg, rollupCfg)
if err != nil {
return nil, fmt.Errorf("creating channel out: %w", err)
}
return newChannel(log, metr, cfg, rollupCfg, latestL1OriginBlockNum, channelOut), nil
}

// TestChannelTimeout tests that the channel manager
// correctly identifies when a pending channel is timed out.
func TestChannelTimeout(t *testing.T) {
Expand Down Expand Up @@ -121,7 +130,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
require := require.New(t)
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
Expand Down Expand Up @@ -162,7 +171,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
require := require.New(t)
const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannel(lgr, metrics.NoopMetrics, ChannelConfig{
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
Expand Down
25 changes: 15 additions & 10 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ type RollupClient interface {

// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct {
Log log.Logger
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
AltDA *altda.DAClient
Log log.Logger
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
AltDA *altda.DAClient
ChannelOutFactory ChannelOutFactory
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand Down Expand Up @@ -115,9 +116,13 @@ type BatchSubmitter struct {

// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
state := NewChannelManager(setup.Log, setup.Metr, setup.ChannelConfig, setup.RollupConfig)
if setup.ChannelOutFactory != nil {
state.SetChannelOutFactory(setup.ChannelOutFactory)
}
return &BatchSubmitter{
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.ChannelConfig, setup.RollupConfig),
state: state,
}
}

Expand Down
20 changes: 13 additions & 7 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,20 @@ type BatcherService struct {
NotSubmittingOnStart bool
}

type DriverSetupOption func(setup *DriverSetup)

// BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig.
// The service components are fully started, except for the driver,
// which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*BatcherService, error) {
func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) (*BatcherService, error) {
var bs BatcherService
if err := bs.initFromCLIConfig(ctx, version, cfg, log); err != nil {
if err := bs.initFromCLIConfig(ctx, version, cfg, log, opts...); err != nil {
return nil, errors.Join(err, bs.Stop(ctx)) // try to clean up our failed initialization attempt
}
return &bs, nil
}

func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) error {
func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger, opts ...DriverSetupOption) error {
bs.Version = version
bs.Log = log
bs.NotSubmittingOnStart = cfg.Stopped
Expand Down Expand Up @@ -122,7 +124,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
bs.initDriver()
bs.initDriver(opts...)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
Expand Down Expand Up @@ -315,8 +317,8 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDriver() {
bs.driver = NewBatchSubmitter(DriverSetup{
func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
ds := DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
RollupConfig: bs.RollupConfig,
Expand All @@ -326,7 +328,11 @@ func (bs *BatcherService) initDriver() {
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
AltDA: bs.AltDA,
})
}
for _, opt := range opts {
opt(&ds)
}
bs.driver = NewBatchSubmitter(ds)
}

func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {
Expand Down

0 comments on commit dd0c711

Please sign in to comment.