Skip to content

Commit

Permalink
Add Channel interface and factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mdehoog committed Oct 5, 2024
1 parent ca604d5 commit c262d85
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 98 deletions.
98 changes: 65 additions & 33 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,35 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type Channel interface {
ID() derive.ChannelID
Timeout() uint64
CheckTimeout(l1BlockNum uint64)
AddBlock(block *types.Block) (*derive.L1BlockInfo, error)
Blocks() []*types.Block
HasTxData() bool
NextTxData() txData
IsFull() bool
FullErr() error
InputBytes() int
ReadyBytes() int
OutputBytes() int
TotalFrames() int
PendingFrames() int
OutputFrames() error
NoneSubmitted() bool
IsFullySubmitted() bool
LatestL1Origin() eth.BlockID
OldestL1Origin() eth.BlockID
LatestL2() eth.BlockID
OldestL2() eth.BlockID
PendingLen() int
ConfirmedLen() int
TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*types.Block)
TxFailed(id string)
Close()
}

// channel is a lightweight wrapper around a ChannelBuilder which keeps track of pending
// and confirmed transactions for a single channel.
type channel struct {
Expand All @@ -26,14 +55,16 @@ type channel struct {
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[string]eth.BlockID

// True if confirmed TX list is updated. Set to false after updated min/max inclusion blocks.
confirmedTxUpdated bool
// Inclusion block number of first confirmed TX
minInclusionBlock uint64
// Inclusion block number of last confirmed TX
maxInclusionBlock uint64
}

func NewChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (Channel, error) {
return newChannel(log, metr, cfg, rollupCfg, latestL1OriginBlockNum)
}

func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (*channel, error) {
cb, err := NewChannelBuilder(cfg, rollupCfg, latestL1OriginBlockNum)
if err != nil {
Expand Down Expand Up @@ -82,9 +113,11 @@ func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
}
delete(s.pendingTransactions, id)
s.confirmedTransactions[id] = inclusionBlock
s.confirmedTxUpdated = true
s.channelBuilder.FramePublished(inclusionBlock.Number)

// Update min/max inclusion blocks for timeout check
s.updateInclusionBlocks()

// If this channel timed out, put the pending blocks back into the local saved blocks
// and then reset this state so it can try to build a new channel.
if s.isTimedOut() {
Expand All @@ -93,7 +126,7 @@ func (s *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) (bool, []*t
return true, s.channelBuilder.Blocks()
}
// If we are done with this channel, record that.
if s.isFullySubmitted() {
if s.IsFullySubmitted() {
s.metr.RecordChannelFullySubmitted(s.ID())
s.log.Info("Channel is fully submitted", "id", s.ID(), "min_inclusion_block", s.minInclusionBlock, "max_inclusion_block", s.maxInclusionBlock)
return true, nil
Expand All @@ -109,41 +142,26 @@ func (s *channel) Timeout() uint64 {

// updateInclusionBlocks finds the first & last confirmed tx and saves its inclusion numbers
func (s *channel) updateInclusionBlocks() {
if len(s.confirmedTransactions) == 0 || !s.confirmedTxUpdated {
return
}
// If there are confirmed transactions, find the first + last confirmed block numbers
min := uint64(math.MaxUint64)
max := uint64(0)
s.minInclusionBlock = uint64(math.MaxUint64)
s.maxInclusionBlock = uint64(0)
for _, inclusionBlock := range s.confirmedTransactions {
if inclusionBlock.Number < min {
min = inclusionBlock.Number
}
if inclusionBlock.Number > max {
max = inclusionBlock.Number
}
s.minInclusionBlock = min(s.minInclusionBlock, inclusionBlock.Number)
s.maxInclusionBlock = max(s.maxInclusionBlock, inclusionBlock.Number)
}
s.minInclusionBlock = min
s.maxInclusionBlock = max
s.confirmedTxUpdated = false
}

// pendingChannelIsTimedOut returns true if submitted channel has timed out.
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (s *channel) isTimedOut() bool {
// Update min/max inclusion blocks for timeout check
s.updateInclusionBlocks()
// Prior to the granite hard fork activating, the use of the shorter ChannelTimeout here may cause the batcher
// to believe the channel timed out when it was valid. It would then resubmit the blocks needlessly.
// This wastes batcher funds but doesn't cause any problems for the chain progressing safe head.
return s.maxInclusionBlock-s.minInclusionBlock >= s.cfg.ChannelTimeout
}

// pendingChannelIsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) isFullySubmitted() bool {
// Update min/max inclusion blocks for timeout check
s.updateInclusionBlocks()
// IsFullySubmitted returns true if the channel has been fully submitted.
func (s *channel) IsFullySubmitted() bool {
return s.IsFull() && len(s.pendingTransactions)+s.PendingFrames() == 0
}

Expand Down Expand Up @@ -201,6 +219,10 @@ func (s *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
return s.channelBuilder.AddBlock(block)
}

func (s *channel) Blocks() []*types.Block {
return s.channelBuilder.Blocks()
}

func (s *channel) InputBytes() int {
return s.channelBuilder.InputBytes()
}
Expand All @@ -226,23 +248,33 @@ func (s *channel) OutputFrames() error {
}

// LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel
func (c *channel) LatestL1Origin() eth.BlockID {
return c.channelBuilder.LatestL1Origin()
func (s *channel) LatestL1Origin() eth.BlockID {
return s.channelBuilder.LatestL1Origin()
}

// OldestL1Origin returns the oldest L1 block origin from all the L2 blocks that have been added to the channel
func (c *channel) OldestL1Origin() eth.BlockID {
return c.channelBuilder.OldestL1Origin()
func (s *channel) OldestL1Origin() eth.BlockID {
return s.channelBuilder.OldestL1Origin()
}

// LatestL2 returns the latest L2 block from all the L2 blocks that have been added to the channel
func (c *channel) LatestL2() eth.BlockID {
return c.channelBuilder.LatestL2()
func (s *channel) LatestL2() eth.BlockID {
return s.channelBuilder.LatestL2()
}

// OldestL2 returns the oldest L2 block from all the L2 blocks that have been added to the channel
func (c *channel) OldestL2() eth.BlockID {
return c.channelBuilder.OldestL2()
func (s *channel) OldestL2() eth.BlockID {
return s.channelBuilder.OldestL2()
}

// PendingLen returns the number of pending transactions in the channel
func (s *channel) PendingLen() int {
return len(s.pendingTransactions)
}

// ConfirmedLen returns the number of confirmed transactions in the channel
func (s *channel) ConfirmedLen() int {
return len(s.confirmedTransactions)
}

func (s *channel) Close() {
Expand Down
35 changes: 21 additions & 14 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

var ErrReorg = errors.New("block does not extend existing chain")

type ChannelFactory func(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config, latestL1OriginBlockNum uint64) (Channel, error)

// channelManager stores a contiguous set of blocks & turns them into channels.
// Upon receiving tx confirmation (or a tx failure), it does channel error handling.
//
Expand All @@ -31,6 +33,7 @@ type channelManager struct {
metr metrics.Metricer
cfgProvider ChannelConfigProvider
rollupCfg *rollup.Config
chFactory ChannelFactory

// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
Expand All @@ -42,24 +45,28 @@ type channelManager struct {
tip common.Hash

// channel to write new block data to
currentChannel *channel
currentChannel Channel
// channels to read frame data from, for writing batches onchain
channelQueue []*channel
channelQueue []Channel
// used to lookup channels by tx ID upon tx success / failure
txChannels map[string]*channel
txChannels map[string]Channel

// if set to true, prevents production of any new channel frames
closed bool
}

func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider ChannelConfigProvider, rollupCfg *rollup.Config, chFactory ChannelFactory) *channelManager {
if chFactory == nil {
chFactory = NewChannel
}
return &channelManager{
log: log,
metr: metr,
cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
chFactory: chFactory,
txChannels: make(map[string]Channel),
}
}

Expand All @@ -75,7 +82,7 @@ func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
s.closed = false
s.currentChannel = nil
s.channelQueue = nil
s.txChannels = make(map[string]*channel)
s.txChannels = make(map[string]Channel)
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
Expand Down Expand Up @@ -121,7 +128,7 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
}

// removePendingChannel removes the given completed channel from the manager's state.
func (s *channelManager) removePendingChannel(channel *channel) {
func (s *channelManager) removePendingChannel(channel Channel) {
if s.currentChannel == channel {
s.currentChannel = nil
}
Expand All @@ -141,7 +148,7 @@ func (s *channelManager) removePendingChannel(channel *channel) {

// nextTxData dequeues frames from the channel and returns them encoded in a transaction.
// It also updates the internal tx -> channels mapping
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
func (s *channelManager) nextTxData(channel Channel) (txData, error) {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
return txData{}, io.EOF // TODO: not enough data error instead
Expand Down Expand Up @@ -201,8 +208,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
// to the current channel and generates frames for it.
// Always returns nil and the io.EOF sentinel error when
// there is no channel with txData
func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
var firstWithTxData *channel
func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (Channel, error) {
var firstWithTxData Channel
for _, ch := range s.channelQueue {
if ch.HasTxData() {
firstWithTxData = ch
Expand Down Expand Up @@ -265,7 +272,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
// This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
pc, err := s.chFactory(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
Expand Down Expand Up @@ -449,7 +456,7 @@ func (s *channelManager) Close() error {
s.log.Info("Channel has no past or pending submission - dropping", "id", ch.ID())
s.removePendingChannel(ch)
} else {
s.log.Info("Channel is in-flight and will need to be submitted after close", "id", ch.ID(), "confirmed", len(ch.confirmedTransactions), "pending", len(ch.pendingTransactions))
s.log.Info("Channel is in-flight and will need to be submitted after close", "id", ch.ID(), "confirmed", ch.ConfirmedLen(), "pending", ch.PendingLen())
}
}
s.log.Info("Reviewed all pending channels on close", "remaining", len(s.channelQueue))
Expand Down Expand Up @@ -481,14 +488,14 @@ func (s *channelManager) Close() error {
// Requeue rebuilds the channel manager state by
// rewinding blocks back from the channel queue, and setting the defaultCfg.
func (s *channelManager) Requeue(newCfg ChannelConfig) {
newChannelQueue := []*channel{}
newChannelQueue := []Channel{}
blocksToRequeue := []*types.Block{}
for _, channel := range s.channelQueue {
if !channel.NoneSubmitted() {
newChannelQueue = append(newChannelQueue, channel)
continue
}
blocksToRequeue = append(blocksToRequeue, channel.channelBuilder.Blocks()...)
blocksToRequeue = append(blocksToRequeue, channel.Blocks()...)
}

// We put the blocks back at the front of the queue:
Expand Down
Loading

0 comments on commit c262d85

Please sign in to comment.