diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 6d8b6eb9da1f..6e237b176d8d 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "math/big" _ "net/http/pprof" "sync" @@ -430,7 +431,7 @@ func (l *BatchSubmitter) loop() { defer ticker.Stop() publishAndWait := func() { - l.publishStateToL1(queue, receiptsCh, daGroup) + l.publishStateToL1(queue, receiptsCh, daGroup, time.Duration(math.MaxInt64)) if !l.Txmgr.IsClosed() { if l.Config.UseAltDA { l.Log.Info("Waiting for altDA writes to complete...") @@ -470,7 +471,7 @@ func (l *BatchSubmitter) loop() { l.clearState(l.shutdownCtx) continue } - l.publishStateToL1(queue, receiptsCh, daGroup) + l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) case <-l.shutdownCtx.Done(): if l.Txmgr.IsClosed() { l.Log.Info("Txmgr is closed, remaining channel data won't be sent") @@ -614,9 +615,11 @@ func (l *BatchSubmitter) waitNodeSync() error { return dial.WaitRollupSync(l.shutdownCtx, l.Log, rollupClient, l1TargetBlock, time.Second*12) } -// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is -// no more data to queue for publishing or if there was an error queing the data. -func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { +// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is no more data to +// queue for publishing or if there was an error queing the data. maxDuration tells this function to return from state +// publishing after this amount of time has been exceeded even if there is more data remaining. +func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group, maxDuration time.Duration) { + start := time.Now() for { // if the txmgr is closed, we stop the transaction sending if l.Txmgr.IsClosed() { @@ -634,6 +637,10 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh } return } + if time.Since(start) > maxDuration { + l.Log.Warn("Aborting state publishing, max duration exceeded") + return + } } }