Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: support auto switch to cheaper DA type #11059

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"sync"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -226,6 +227,22 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

// SwitchDAType switch related configs to target DA type
func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) {
s.mu.Lock()
defer s.mu.Unlock()
switch targetDAType {
case flags.BlobsType:
s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1
s.cfg.MultiFrameTxs = true
case flags.CalldataType:
s.cfg.MaxFrameSize = CallDataTxMaxSize - 1
s.cfg.MultiFrameTxs = false
default:
s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// registerL1Block registers the given block at the pending channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number)
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *CLIConfig) Check() error {
if c.CheckRecentTxsDepth > 128 {
return fmt.Errorf("CheckRecentTxsDepth cannot be set higher than 128: %v", c.CheckRecentTxsDepth)
}
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 {
return errors.New("too many frames for blob transactions, max 6")
}
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {
Expand Down
147 changes: 146 additions & 1 deletion op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand All @@ -19,12 +20,27 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

const LimitLoadBlocksOneTime uint64 = 30

// Auto-switch DA params
// DATypeSwitchThrehold is the threhold to trigger blob<->calldata switch
const DATypeSwitchThrehold int = 5

// CallDataTxMaxSize is 120KB
const CallDataTxMaxSize uint64 = 120000

// ApproximateGasPerCallDataTx is the average gas used per 120KB calldata tx
const ApproximateGasPerCallDataTx int64 = 1934892
const MaxBlobsNumberPerTx int64 = 6

var (
ErrBatcherNotRunning = errors.New("batcher is not running")
emptyTxData = txData{
Expand All @@ -44,6 +60,7 @@ type txRef struct {
type L1Client interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
}

type L2Client interface {
Expand All @@ -65,6 +82,7 @@ type DriverSetup struct {
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfig
PlasmaDA *plasma.DAClient
AutoSwitchDA bool
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand Down Expand Up @@ -173,10 +191,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
} else if start.Number >= end.Number {
return errors.New("start number is >= end number")
}
// Limit the max loaded blocks one time. If batcher is lagged and catching up, small loading block step helps choose economic DA type in time.
endNumber := end.Number
if endNumber-start.Number > LimitLoadBlocksOneTime {
endNumber = start.Number + LimitLoadBlocksOneTime
}

var latestBlock *types.Block
// Add all blocks to "state"
for i := start.Number + 1; i < end.Number+1; i++ {
for i := start.Number + 1; i < endNumber+1; i++ {
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
Expand Down Expand Up @@ -325,6 +348,57 @@ func (l *BatchSubmitter) loop() {
}
}()

economicDATypeCh := make(chan flags.DataAvailabilityType)
waitSwitchDACh := make(chan struct{})
if l.AutoSwitchDA {
// start auto choose economic DA type processing loop
economicDALoopDone := make(chan struct{})
defer close(economicDALoopDone) // shut down auto DA loop
go func() {
economicDAType := flags.BlobsType
l.Metr.RecordAutoChoosedDAType(economicDAType)
switchCount := 0
economicDATicker := time.NewTicker(12 * time.Second)
defer economicDATicker.Stop()
addressReservedErrorTicker := time.NewTicker(time.Second)
defer addressReservedErrorTicker.Stop()
for {
select {
case <-economicDATicker.C:
if txpoolState.Load() != TxpoolGood {
switchCount = 0
continue
}
newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx)
if err != nil {
l.Log.Error("getEconomicDAType failed: %w", err)
continue
}
if newEconomicDAType != economicDAType {
switchCount++
} else {
switchCount = 0
}
if switchCount >= DATypeSwitchThrehold {
l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String())
start := time.Now()
economicDAType = newEconomicDAType
switchCount = 0
economicDATypeCh <- economicDAType
<-waitSwitchDACh
l.Log.Info("finish economic switch", "duration", time.Since(start))
l.Metr.RecordAutoChoosedDAType(economicDAType)
l.Metr.RecordEconomicAutoSwitchCount()
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
}
case <-economicDALoopDone:
l.Log.Info("auto DA processing loop done")
return
}
}
}()
}

ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -365,6 +439,26 @@ func (l *BatchSubmitter) loop() {
continue
}
l.publishStateToL1(queue, receiptsCh)
case targetDAType := <-economicDATypeCh:
l.lastStoredBlock = eth.BlockID{}
// close current state to prepare for switch
err := l.state.Close()
if err != nil {
if errors.Is(err, ErrPendingAfterClose) {
l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting")
} else {
l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err)
}
}
// on DA type switch we want to publish all pending state then wait until each result clears before resetting
// the state.
publishAndWait()
l.clearState(l.shutdownCtx)
// switch action after clear state
l.switchDAType(targetDAType)
time.Sleep(time.Minute) // wait op-node derivation published DA data to reduce the chance to submit duplicated blocks
waitSwitchDACh <- struct{}{}
continue
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand All @@ -387,6 +481,57 @@ func (l *BatchSubmitter) loop() {
}
}

func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) {
tCtx, tCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer tCancel()
gasTipCap, err := l.L1Client.SuggestGasTipCap(tCtx)
if err != nil {
return "", fmt.Errorf("getEconomicDAType: failed to fetch the suggested gas tip cap: %w", err)
}
bCtx, bCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
defer bCancel()
head, err := l.L1Client.HeaderByNumber(bCtx, nil)
if err != nil {
return "", fmt.Errorf("getEconomicDAType: failed to fetch the header: %w", err)
} else if head.BaseFee == nil {
return "", fmt.Errorf("getEconomicDAType: does not support pre-london blocks that do not have a base fee")
}
gasPrice := new(big.Int).Add(gasTipCap, new(big.Int).Mul(head.BaseFee, big.NewInt(2)))
calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice)

if head.ExcessBlobGas == nil {
return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", head)
}
blobGasPrice := eip4844.CalcBlobFee(*head.ExcessBlobGas)
blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice))

l.Metr.RecordEstimatedCalldataTypeFee(calldataCost)
l.Metr.RecordEstimatedBlobTypeFee(blobCost)
if calldataCost.Cmp(blobCost) < 0 {
l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.CalldataType, nil
}
l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
return flags.BlobsType, nil
}

func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) {
switch targetDAType {
case flags.BlobsType:
l.Config.UseBlobs = true
l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1
l.ChannelConfig.MultiFrameTxs = true
l.state.SwitchDAType(targetDAType)
case flags.CalldataType:
l.Config.UseBlobs = false
l.ChannelConfig.MaxFrameSize = CallDataTxMaxSize - 1
l.ChannelConfig.MultiFrameTxs = false
l.state.SwitchDAType(targetDAType)
default:
l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String())
}
}

// waitNodeSync Check to see if there was a batcher tx sent recently that
// still needs more block confirmations before being considered finalized
func (l *BatchSubmitter) waitNodeSync() error {
Expand Down
9 changes: 6 additions & 3 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPlasmaDA(cfg); err != nil {
return fmt.Errorf("failed to init plasma DA: %w", err)
}
bs.initDriver()
bs.initDriver(cfg)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
}
Expand Down Expand Up @@ -201,7 +201,8 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
}

switch cfg.DataAvailabilityType {
case flags.BlobsType:
// flags.AutoType will choose BlobsType to start
case flags.BlobsType, flags.AutoType:
if !cfg.TestUseMaxTxSizeForBlobs {
// account for version byte prefix
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
Expand Down Expand Up @@ -236,6 +237,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
return fmt.Errorf("invalid channel configuration: %w", err)
}
bs.Log.Info("Initialized channel-config",
"da_type", cfg.DataAvailabilityType.String(),
"use_blobs", bs.UseBlobs,
"use_plasma", bs.UsePlasma,
"max_frame_size", cc.MaxFrameSize,
Expand Down Expand Up @@ -298,7 +300,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
return nil
}

func (bs *BatcherService) initDriver() {
func (bs *BatcherService) initDriver(cfg *CLIConfig) {
bs.driver = NewBatchSubmitter(DriverSetup{
Log: bs.Log,
Metr: bs.Metrics,
Expand All @@ -309,6 +311,7 @@ func (bs *BatcherService) initDriver() {
EndpointProvider: bs.EndpointProvider,
ChannelConfig: bs.ChannelConfig,
PlasmaDA: bs.PlasmaDA,
AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType,
})
}

Expand Down
2 changes: 2 additions & 0 deletions op-batcher/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const (
// data availability types
CalldataType DataAvailabilityType = "calldata"
BlobsType DataAvailabilityType = "blobs"
AutoType DataAvailabilityType = "auto"
)

var DataAvailabilityTypes = []DataAvailabilityType{
CalldataType,
BlobsType,
AutoType,
}

func (kind DataAvailabilityType) String() string {
Expand Down
Loading