diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 82d692b0ca4f..aa136207284d 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -226,6 +227,22 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } +// SwitchDAType switch related configs to target DA type +func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) { + s.mu.Lock() + defer s.mu.Unlock() + switch targetDAType { + case flags.BlobsType: + s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1 + s.cfg.MultiFrameTxs = true + case flags.CalldataType: + s.cfg.MaxFrameSize = CallDataTxMaxSize - 1 + s.cfg.MultiFrameTxs = false + default: + s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String()) + } +} + // registerL1Block registers the given block at the pending channel. func (s *channelManager) registerL1Block(l1Head eth.BlockID) { s.currentChannel.CheckTimeout(l1Head.Number) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 5180d8434b2f..9f6e39897dc7 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -137,7 +137,7 @@ func (c *CLIConfig) Check() error { if c.CheckRecentTxsDepth > 128 { return fmt.Errorf("CheckRecentTxsDepth cannot be set higher than 128: %v", c.CheckRecentTxsDepth) } - if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 { + if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 { return errors.New("too many frames for blob transactions, max 6") } if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) { diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 8784b20119d9..ee6774e03edc 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -19,12 +20,27 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" ) +const LimitLoadBlocksOneTime uint64 = 30 + +// Auto-switch DA params +// DATypeSwitchThrehold is the threhold to trigger blob<->calldata switch +const DATypeSwitchThrehold int = 5 + +// CallDataTxMaxSize is 120KB +const CallDataTxMaxSize uint64 = 120000 + +// ApproximateGasPerCallDataTx is the average gas used per 120KB calldata tx +const ApproximateGasPerCallDataTx int64 = 1934892 +const MaxBlobsNumberPerTx int64 = 6 + var ( ErrBatcherNotRunning = errors.New("batcher is not running") emptyTxData = txData{ @@ -44,6 +60,7 @@ type txRef struct { type L1Client interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) + SuggestGasTipCap(ctx context.Context) (*big.Int, error) } type L2Client interface { @@ -65,6 +82,7 @@ type DriverSetup struct { EndpointProvider dial.L2EndpointProvider ChannelConfig ChannelConfig PlasmaDA *plasma.DAClient + AutoSwitchDA bool } // BatchSubmitter encapsulates a service responsible for submitting L2 tx @@ -173,10 +191,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { } else if start.Number >= end.Number { return errors.New("start number is >= end number") } + // Limit the max loaded blocks one time. If batcher is lagged and catching up, small loading block step helps choose economic DA type in time. + endNumber := end.Number + if endNumber-start.Number > LimitLoadBlocksOneTime { + endNumber = start.Number + LimitLoadBlocksOneTime + } var latestBlock *types.Block // Add all blocks to "state" - for i := start.Number + 1; i < end.Number+1; i++ { + for i := start.Number + 1; i < endNumber+1; i++ { block, err := l.loadBlockIntoState(ctx, i) if errors.Is(err, ErrReorg) { l.Log.Warn("Found L2 reorg", "block_number", i) @@ -325,6 +348,57 @@ func (l *BatchSubmitter) loop() { } }() + economicDATypeCh := make(chan flags.DataAvailabilityType) + waitSwitchDACh := make(chan struct{}) + if l.AutoSwitchDA { + // start auto choose economic DA type processing loop + economicDALoopDone := make(chan struct{}) + defer close(economicDALoopDone) // shut down auto DA loop + go func() { + economicDAType := flags.BlobsType + l.Metr.RecordAutoChoosedDAType(economicDAType) + switchCount := 0 + economicDATicker := time.NewTicker(12 * time.Second) + defer economicDATicker.Stop() + addressReservedErrorTicker := time.NewTicker(time.Second) + defer addressReservedErrorTicker.Stop() + for { + select { + case <-economicDATicker.C: + if txpoolState.Load() != TxpoolGood { + switchCount = 0 + continue + } + newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx) + if err != nil { + l.Log.Error("getEconomicDAType failed: %w", err) + continue + } + if newEconomicDAType != economicDAType { + switchCount++ + } else { + switchCount = 0 + } + if switchCount >= DATypeSwitchThrehold { + l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String()) + start := time.Now() + economicDAType = newEconomicDAType + switchCount = 0 + economicDATypeCh <- economicDAType + <-waitSwitchDACh + l.Log.Info("finish economic switch", "duration", time.Since(start)) + l.Metr.RecordAutoChoosedDAType(economicDAType) + l.Metr.RecordEconomicAutoSwitchCount() + l.Metr.RecordAutoSwitchTimeDuration(time.Since(start)) + } + case <-economicDALoopDone: + l.Log.Info("auto DA processing loop done") + return + } + } + }() + } + ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() @@ -365,6 +439,26 @@ func (l *BatchSubmitter) loop() { continue } l.publishStateToL1(queue, receiptsCh) + case targetDAType := <-economicDATypeCh: + l.lastStoredBlock = eth.BlockID{} + // close current state to prepare for switch + err := l.state.Close() + if err != nil { + if errors.Is(err, ErrPendingAfterClose) { + l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting") + } else { + l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err) + } + } + // on DA type switch we want to publish all pending state then wait until each result clears before resetting + // the state. + publishAndWait() + l.clearState(l.shutdownCtx) + // switch action after clear state + l.switchDAType(targetDAType) + time.Sleep(time.Minute) // wait op-node derivation published DA data to reduce the chance to submit duplicated blocks + waitSwitchDACh <- struct{}{} + continue case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -387,6 +481,57 @@ func (l *BatchSubmitter) loop() { } } +func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) { + tCtx, tCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout) + defer tCancel() + gasTipCap, err := l.L1Client.SuggestGasTipCap(tCtx) + if err != nil { + return "", fmt.Errorf("getEconomicDAType: failed to fetch the suggested gas tip cap: %w", err) + } + bCtx, bCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout) + defer bCancel() + head, err := l.L1Client.HeaderByNumber(bCtx, nil) + if err != nil { + return "", fmt.Errorf("getEconomicDAType: failed to fetch the header: %w", err) + } else if head.BaseFee == nil { + return "", fmt.Errorf("getEconomicDAType: does not support pre-london blocks that do not have a base fee") + } + gasPrice := new(big.Int).Add(gasTipCap, new(big.Int).Mul(head.BaseFee, big.NewInt(2))) + calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice) + + if head.ExcessBlobGas == nil { + return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", head) + } + blobGasPrice := eip4844.CalcBlobFee(*head.ExcessBlobGas) + blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice)) + + l.Metr.RecordEstimatedCalldataTypeFee(calldataCost) + l.Metr.RecordEstimatedBlobTypeFee(blobCost) + if calldataCost.Cmp(blobCost) < 0 { + l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost) + return flags.CalldataType, nil + } + l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost) + return flags.BlobsType, nil +} + +func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) { + switch targetDAType { + case flags.BlobsType: + l.Config.UseBlobs = true + l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1 + l.ChannelConfig.MultiFrameTxs = true + l.state.SwitchDAType(targetDAType) + case flags.CalldataType: + l.Config.UseBlobs = false + l.ChannelConfig.MaxFrameSize = CallDataTxMaxSize - 1 + l.ChannelConfig.MultiFrameTxs = false + l.state.SwitchDAType(targetDAType) + default: + l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String()) + } +} + // waitNodeSync Check to see if there was a batcher tx sent recently that // still needs more block confirmations before being considered finalized func (l *BatchSubmitter) waitNodeSync() error { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index 71dd7fa4b6d1..509eb071a6d4 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -123,7 +123,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, if err := bs.initPlasmaDA(cfg); err != nil { return fmt.Errorf("failed to init plasma DA: %w", err) } - bs.initDriver() + bs.initDriver(cfg) if err := bs.initRPCServer(cfg); err != nil { return fmt.Errorf("failed to start RPC server: %w", err) } @@ -201,7 +201,8 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { } switch cfg.DataAvailabilityType { - case flags.BlobsType: + // flags.AutoType will choose BlobsType to start + case flags.BlobsType, flags.AutoType: if !cfg.TestUseMaxTxSizeForBlobs { // account for version byte prefix cc.MaxFrameSize = eth.MaxBlobDataSize - 1 @@ -236,6 +237,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { return fmt.Errorf("invalid channel configuration: %w", err) } bs.Log.Info("Initialized channel-config", + "da_type", cfg.DataAvailabilityType.String(), "use_blobs", bs.UseBlobs, "use_plasma", bs.UsePlasma, "max_frame_size", cc.MaxFrameSize, @@ -298,7 +300,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error { return nil } -func (bs *BatcherService) initDriver() { +func (bs *BatcherService) initDriver(cfg *CLIConfig) { bs.driver = NewBatchSubmitter(DriverSetup{ Log: bs.Log, Metr: bs.Metrics, @@ -309,6 +311,7 @@ func (bs *BatcherService) initDriver() { EndpointProvider: bs.EndpointProvider, ChannelConfig: bs.ChannelConfig, PlasmaDA: bs.PlasmaDA, + AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType, }) } diff --git a/op-batcher/flags/types.go b/op-batcher/flags/types.go index 0db97cdad219..775c916d0270 100644 --- a/op-batcher/flags/types.go +++ b/op-batcher/flags/types.go @@ -8,11 +8,13 @@ const ( // data availability types CalldataType DataAvailabilityType = "calldata" BlobsType DataAvailabilityType = "blobs" + AutoType DataAvailabilityType = "auto" ) var DataAvailabilityTypes = []DataAvailabilityType{ CalldataType, BlobsType, + AutoType, } func (kind DataAvailabilityType) String() string { diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index 417f927ee6f6..fe8f7b324e7a 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -2,6 +2,8 @@ package metrics import ( "io" + "math/big" + "time" "github.com/prometheus/client_golang/prometheus" @@ -9,7 +11,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -48,6 +52,13 @@ type Metricer interface { RecordBlobUsedBytes(num int) + RecordAutoChoosedDAType(daType flags.DataAvailabilityType) + RecordEconomicAutoSwitchCount() + RecordReservedErrorSwitchCount() + RecordAutoSwitchTimeDuration(duration time.Duration) + RecordEstimatedCalldataTypeFee(fee *big.Int) + RecordEstimatedBlobTypeFee(fee *big.Int) + Document() []opmetrics.DocumentedMetric } @@ -83,6 +94,13 @@ type Metrics struct { batcherTxEvs opmetrics.EventVec blobUsedBytes prometheus.Histogram + + autoChoosedDAType prometheus.Gauge + economicAutoSwitchCount prometheus.Counter + reservedErrorSwitchCount prometheus.Counter + autoSwitchTimeDuration prometheus.Gauge + estimatedCalldataTypeFee prometheus.Gauge + estimatedBlobTypeFee prometheus.Gauge } var _ Metricer = (*Metrics)(nil) @@ -191,6 +209,36 @@ func NewMetrics(procName string) *Metrics { Help: "Blob size in bytes (of last blob only for multi-blob txs).", Buckets: prometheus.LinearBuckets(0.0, eth.MaxBlobDataSize/13, 14), }), + autoChoosedDAType: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "auto_choosed_da_type", + Help: "Current DA type choosed by auto switch", + }), + economicAutoSwitchCount: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "economic_auto_switch_count", + Help: "Total number of switch action caused by economic calculation", + }), + reservedErrorSwitchCount: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "reserved_error_switch_count", + Help: "Total number of switch action caused by txpool addressReservedError", + }), + autoSwitchTimeDuration: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "auto_switch_time_duration", + Help: "Time duration in milliseconds of auto switch action", + }), + estimatedCalldataTypeFee: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "estimated_calldata_type_fee", + Help: "Current estimated fee in gwei of calldata type(6 txes) by auto switch routine", + }), + estimatedBlobTypeFee: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "estimated_blob_type_fee", + Help: "Current estimated fee in gwei of blob type(1 tx with 6 blobs) by auto switch routine", + }), batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), } @@ -318,6 +366,34 @@ func (m *Metrics) RecordBlobUsedBytes(num int) { m.blobUsedBytes.Observe(float64(num)) } +func (m *Metrics) RecordAutoChoosedDAType(daType flags.DataAvailabilityType) { + if daType == flags.CalldataType { + m.autoChoosedDAType.Set(0) + } else if daType == flags.BlobsType { + m.autoChoosedDAType.Set(1) + } +} + +func (m *Metrics) RecordEconomicAutoSwitchCount() { + m.economicAutoSwitchCount.Inc() +} + +func (m *Metrics) RecordReservedErrorSwitchCount() { + m.reservedErrorSwitchCount.Inc() +} + +func (m *Metrics) RecordAutoSwitchTimeDuration(duration time.Duration) { + m.autoSwitchTimeDuration.Set(float64(duration.Milliseconds())) +} + +func (m *Metrics) RecordEstimatedCalldataTypeFee(fee *big.Int) { + m.estimatedCalldataTypeFee.Set(float64(fee.Uint64()) / params.GWei) +} + +func (m *Metrics) RecordEstimatedBlobTypeFee(fee *big.Int) { + m.estimatedBlobTypeFee.Set(float64(fee.Uint64()) / params.GWei) +} + // estimateBatchSize estimates the size of the batch func estimateBatchSize(block *types.Block) uint64 { size := uint64(70) // estimated overhead of batch metadata diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go index 36594efe47c7..18e736f4a58c 100644 --- a/op-batcher/metrics/noop.go +++ b/op-batcher/metrics/noop.go @@ -2,12 +2,15 @@ package metrics import ( "io" + "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" @@ -46,3 +49,10 @@ func (*noopMetrics) RecordBlobUsedBytes(int) {} func (*noopMetrics) StartBalanceMetrics(log.Logger, *ethclient.Client, common.Address) io.Closer { return nil } + +func (*noopMetrics) RecordAutoChoosedDAType(daType flags.DataAvailabilityType) {} +func (*noopMetrics) RecordEconomicAutoSwitchCount() {} +func (*noopMetrics) RecordReservedErrorSwitchCount() {} +func (*noopMetrics) RecordAutoSwitchTimeDuration(duration time.Duration) {} +func (*noopMetrics) RecordEstimatedCalldataTypeFee(fee *big.Int) {} +func (*noopMetrics) RecordEstimatedBlobTypeFee(fee *big.Int) {}