Skip to content

Commit

Permalink
op-batcher: Implement dynamic blob/calldata selection
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Jul 23, 2024
1 parent b7f8188 commit d045fa1
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 32 deletions.
5 changes: 3 additions & 2 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,15 @@ func (s *channel) ID() derive.ChannelID {
// NextTxData should only be called after HasTxData returned true.
func (s *channel) NextTxData() txData {
nf := s.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf)}
// TODO: consider changing MultiFrameTxs to UseBlobs, as we use it synonymously now
txdata := txData{frames: make([]frameData, 0, nf), asBlob: s.cfg.MultiFrameTxs}
for i := 0; i < nf && s.channelBuilder.HasFrame(); i++ {
frame := s.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames))
s.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
s.pendingTransactions[id] = txdata

return txdata
Expand Down
14 changes: 14 additions & 0 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type ChannelConfig struct {
MultiFrameTxs bool
}

// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static
// ChannelConfigProvider of itself.
func (cc ChannelConfig) ChannelConfig() ChannelConfig {
return cc
}

// InitCompressorConfig (re)initializes the channel configuration's compressor
// configuration using the given values. The TargetOutputSize will be set to a
// value consistent with cc.TargetNumFrames and cc.MaxFrameSize.
Expand Down Expand Up @@ -75,6 +81,14 @@ func (cc *ChannelConfig) InitNoneCompressor() {
cc.InitCompressorConfig(0, compressor.NoneKind, derive.Zlib)
}

func (cc *ChannelConfig) ReinitCompressorConfig() {
cc.InitCompressorConfig(
cc.CompressorConfig.ApproxComprRatio,
cc.CompressorConfig.Kind,
cc.CompressorConfig.CompressionAlgo,
)
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.MultiFrameTxs {
return 1
Expand Down
100 changes: 100 additions & 0 deletions op-batcher/batcher/channel_config_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package batcher

import (
"context"
"math/big"
"time"

"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

const randomByteCalldataGas = params.TxDataNonZeroGasEIP2028

type (
ChannelConfigProvider interface {
ChannelConfig() ChannelConfig
}

GasPricer interface {
SuggestGasPriceCaps(ctx context.Context) (tipCap *big.Int, baseFee *big.Int, blobBaseFee *big.Int, err error)
}

DynamicEthChannelConfig struct {
log log.Logger
ctx context.Context // parent lifecycle context
timeout time.Duration // query timeout
gasPricer GasPricer

blobConfig ChannelConfig
calldataConfig ChannelConfig
latestConfig *ChannelConfig
}
)

func NewDynamicEthChannelConfig(lgr log.Logger,
lifeCtx context.Context, reqTimeout time.Duration, gasPricer GasPricer,
blobConfig ChannelConfig, calldataConfig ChannelConfig,
) *DynamicEthChannelConfig {
// Copy blobConfig and statically configure fallback calldata config.
// In the future, we might want to make the calldata config configurable.
// cdCfg := blobConfig
// cdCfg.TargetNumFrames = 1
// cdCfg.MaxFrameSize = 120_000
// cdCfg.MultiFrameTxs = false

dec := &DynamicEthChannelConfig{
log: lgr,
ctx: lifeCtx,
timeout: reqTimeout,
blobConfig: blobConfig,
calldataConfig: calldataConfig,
}
// start with blob config
dec.latestConfig = &dec.blobConfig
return dec
}

func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
ctx, cancel := context.WithTimeout(dec.ctx, dec.timeout)
defer cancel()
tipCap, baseFee, blobBaseFee, err := dec.gasPricer.SuggestGasPriceCaps(ctx)
if err != nil {
dec.log.Warn("Error querying gas prices, returning latest config", "err", err)
return *dec.latestConfig
}

// We estimate the gas costs of a calldata and blob tx under the assumption that we'd fill
// a frame fully and compressed random channel data has few zeros, so they can be
// ignored in the calldata gas price estimation.
// It is also assumed that a calldata tx would contain exactly one full frame
// and a blob tx would contain target-num-frames many blobs.

// It would be nicer to use core.IntrinsicGas, but we don't have the actual data at hand
calldataBytes := dec.calldataConfig.MaxFrameSize + 1 // + 1 version byte
calldataGas := big.NewInt(int64(calldataBytes*randomByteCalldataGas + params.TxGas))
calldataPrice := new(big.Int).Add(baseFee, tipCap)
calldataCost := new(big.Int).Mul(calldataGas, calldataPrice)

blobGas := big.NewInt(eth.BlobSize * int64(dec.blobConfig.TargetNumFrames))
blobCost := new(big.Int).Mul(blobGas, blobBaseFee)
// blobs still have intrinsic calldata costs
blobCalldataCost := new(big.Int).Mul(big.NewInt(int64(params.TxGas)), calldataPrice)
blobCost = blobCost.Add(blobCost, blobCalldataCost)

blobDataBytes := big.NewInt(eth.MaxBlobDataSize * int64(dec.blobConfig.TargetNumFrames))
lgr := dec.log.New("base_fee", baseFee, "blob_base_fee", blobBaseFee, "tip_cap", tipCap,
"calldata_bytes", calldataBytes, "calldata_cost", calldataCost,
"blob_data_bytes", blobDataBytes, "blob_cost", blobCost)

// Now we compare the prices normalized to the number of bytes that can be
// submitted for that price.
if new(big.Int).Mul(blobCost, big.NewInt(int64(calldataBytes))).
Cmp(new(big.Int).Mul(calldataCost, blobDataBytes)) == 1 {
lgr.Info("Using calldata channel config")
return dec.calldataConfig
}
lgr.Info("Using blob channel config")
return dec.blobConfig
}
34 changes: 18 additions & 16 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ var ErrReorg = errors.New("block does not extend existing chain")
// channel.
// Public functions on channelManager are safe for concurrent access.
type channelManager struct {
mu sync.Mutex
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rollupCfg *rollup.Config
mu sync.Mutex
log log.Logger
metr metrics.Metricer
cfgProvider ChannelConfigProvider
rollupCfg *rollup.Config

// All blocks since the last request for new tx data.
blocks []*types.Block
Expand All @@ -49,13 +49,13 @@ type channelManager struct {
closed bool
}

func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
log: log,
metr: metr,
cfgProvider: cfgProvider,
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
}
}

Expand Down Expand Up @@ -203,7 +203,8 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

pc, err := newChannel(s.log, s.metr, s.cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
cfg := s.cfgProvider.ChannelConfig()
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
Expand All @@ -216,10 +217,11 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"batch_type", s.cfg.BatchType,
"compression_algo", s.cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", s.cfg.TargetNumFrames,
"max_frame_size", s.cfg.MaxFrameSize,
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.MultiFrameTxs,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))

Expand Down
21 changes: 13 additions & 8 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
ErrBatcherNotRunning = errors.New("batcher is not running")
emptyTxData = txData{
frames: []frameData{
frameData{
{
data: []byte{},
},
},
Expand All @@ -39,6 +39,7 @@ var (
type txRef struct {
id txID
isCancel bool
isBlob bool
}

type L1Client interface {
Expand Down Expand Up @@ -303,13 +304,17 @@ func (l *BatchSubmitter) loop() {
receiptLoopDone := make(chan struct{})
defer close(receiptLoopDone) // shut down receipt loop

var txpoolState atomic.Int32
var (
txpoolState atomic.Int32
txpoolBlockedBlob bool
)
txpoolState.Store(TxpoolGood)
go func() {
for {
select {
case r := <-receiptsCh:
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && txpoolState.CompareAndSwap(TxpoolGood, TxpoolBlocked) {
txpoolBlockedBlob = r.ID.isBlob
l.Log.Info("incompatible tx in txpool")
} else if r.ID.isCancel && txpoolState.CompareAndSwap(TxpoolCancelPending, TxpoolGood) {
// Set state to TxpoolGood even if the cancellation transaction ended in error
Expand Down Expand Up @@ -344,7 +349,7 @@ func (l *BatchSubmitter) loop() {
// txpoolState is set to Blocked only if Send() is returning
// ErrAlreadyReserved. In this case, the TxMgr nonce should be reset to nil,
// allowing us to send a cancellation transaction.
l.cancelBlockingTx(queue, receiptsCh)
l.cancelBlockingTx(queue, receiptsCh, txpoolBlockedBlob)
}
if txpoolState.Load() != TxpoolGood {
continue
Expand Down Expand Up @@ -531,15 +536,15 @@ func (l *BatchSubmitter) safeL1Origin(ctx context.Context) (eth.BlockID, error)
// cancelBlockingTx creates an empty transaction of appropriate type to cancel out the incompatible
// transaction stuck in the txpool. In the future we might send an actual batch transaction instead
// of an empty one to avoid wasting the tx fee.
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], isBlockedBlob bool) {
var candidate *txmgr.TxCandidate
var err error
if l.Config.UseBlobs {
if isBlockedBlob {
candidate = l.calldataTxCandidate([]byte{})
} else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil {
panic(err) // this error should not happen
}
l.Log.Warn("sending a cancellation transaction to unblock txpool")
l.Log.Warn("sending a cancellation transaction to unblock txpool", "blocked_blob", isBlockedBlob)
l.queueTx(txData{}, true, candidate, queue, receiptsCh)
}

Expand All @@ -550,7 +555,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.

var candidate *txmgr.TxCandidate
if l.Config.UseBlobs {
if txdata.asBlob {
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
Expand Down Expand Up @@ -593,7 +598,7 @@ func (l *BatchSubmitter) queueTx(txdata txData, isCancel bool, candidate *txmgr.
candidate.GasLimit = intrinsicGas
}

queue.Send(txRef{txdata.ID(), isCancel}, *candidate, receiptsCh)
queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand Down
1 change: 1 addition & 0 deletions op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// different channels.
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
}

func singleFrameTxData(frame frameData) txData {
Expand Down
2 changes: 1 addition & 1 deletion op-service/txmgr/test_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m *TestTxManager) WaitOnJammingTx(ctx context.Context) error {
}

func (m *TestTxManager) makeStuckTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
gasTipCap, _, blobBaseFee, err := m.suggestGasPriceCaps(ctx)
gasTipCap, _, blobBaseFee, err := m.SuggestGasPriceCaps(ctx)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
// NOTE: Otherwise, the [SimpleTxManager] will query the specified backend for an estimate.
func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
m.l.Debug("crafting Transaction", "blobs", len(candidate.Blobs), "calldata_size", len(candidate.TxData))
gasTipCap, baseFee, blobBaseFee, err := m.suggestGasPriceCaps(ctx)
gasTipCap, baseFee, blobBaseFee, err := m.SuggestGasPriceCaps(ctx)
if err != nil {
m.metr.RPCError()
return nil, fmt.Errorf("failed to get gas price info: %w", err)
Expand Down Expand Up @@ -635,7 +635,7 @@ func (m *SimpleTxManager) queryReceipt(ctx context.Context, txHash common.Hash,
// multiple of the suggested values.
func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
m.txLogger(tx, true).Info("bumping gas price for transaction")
tip, baseFee, blobBaseFee, err := m.suggestGasPriceCaps(ctx)
tip, baseFee, blobBaseFee, err := m.SuggestGasPriceCaps(ctx)
if err != nil {
m.txLogger(tx, false).Warn("failed to get suggested gas tip and base fee", "err", err)
return nil, err
Expand Down Expand Up @@ -718,9 +718,9 @@ func (m *SimpleTxManager) increaseGasPrice(ctx context.Context, tx *types.Transa
return signedTx, nil
}

// suggestGasPriceCaps suggests what the new tip, base fee, and blob base fee should be based on
// SuggestGasPriceCaps suggests what the new tip, base fee, and blob base fee should be based on
// the current L1 conditions. blobfee will be nil if 4844 is not yet active.
func (m *SimpleTxManager) suggestGasPriceCaps(ctx context.Context) (*big.Int, *big.Int, *big.Int, error) {
func (m *SimpleTxManager) SuggestGasPriceCaps(ctx context.Context) (*big.Int, *big.Int, *big.Int, error) {
cCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
tip, err := m.backend.SuggestGasTipCap(cCtx)
Expand Down
2 changes: 1 addition & 1 deletion op-service/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ func TestMinFees(t *testing.T) {
conf.MinTipCap = tt.minTipCap
h := newTestHarnessWithConfig(t, conf)

tip, baseFee, _, err := h.mgr.suggestGasPriceCaps(context.TODO())
tip, baseFee, _, err := h.mgr.SuggestGasPriceCaps(context.TODO())
require.NoError(err)

if tt.expectMinBaseFee {
Expand Down

0 comments on commit d045fa1

Please sign in to comment.