From fda7f881cc2afaee54969c29c999cde902d79187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Thu, 2 Nov 2023 17:31:16 +0100 Subject: [PATCH] Properly handle sequenced empty batches with GER Update (#2742) * handle GER update on empty forced batch * fix update current GER * full refactor * full refactor --- sequencer/dbmanager.go | 13 ++++++++----- sequencer/finalizer.go | 31 +++++++++++++++++++++++++++++++ sequencer/finalizer_test.go | 2 +- sequencer/sequencer.go | 7 ++++++- 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/sequencer/dbmanager.go b/sequencer/dbmanager.go index 39ff813e98..1784f3608f 100644 --- a/sequencer/dbmanager.go +++ b/sequencer/dbmanager.go @@ -216,7 +216,8 @@ func (d *dbManager) sendDataToStreamer() { _, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode()) if err != nil { - log.Fatal(err) + log.Errorf("failed to add stream entry for l2block %v: %v", l2Block.L2BlockNumber, err) + continue } err = d.streamServer.CommitAtomicOp() @@ -307,10 +308,12 @@ func (d *dbManager) StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx tr return err } - // Change Tx status to selected - err = d.txPool.UpdateTxStatus(ctx, tx.response.TxHash, pool.TxStatusSelected, false, nil) - if err != nil { - return err + if !tx.isForcedBatch { + // Change Tx status to selected + err = d.txPool.UpdateTxStatus(ctx, tx.response.TxHash, pool.TxStatusSelected, false, nil) + if err != nil { + return err + } } log.Infof("StoreProcessedTxAndDeleteFromPool: successfully stored tx: %v for batch: %v", tx.response.TxHash.String(), tx.batchNumber) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index 6137185c34..c6af5360b6 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" "github.com/0xPolygonHermez/zkevm-node/event" "github.com/0xPolygonHermez/zkevm-node/hex" "github.com/0xPolygonHermez/zkevm-node/log" @@ -70,6 +71,7 @@ type finalizer struct { proverID string lastPendingFlushID uint64 pendingFlushIDCond *sync.Cond + streamServer *datastreamer.StreamServer } type transactionToStore struct { @@ -116,6 +118,7 @@ func newFinalizer( closingSignalCh ClosingSignalCh, batchConstraints batchConstraints, eventLog *event.EventLog, + streamServer *datastreamer.StreamServer, ) *finalizer { f := finalizer{ cfg: cfg, @@ -150,6 +153,7 @@ func newFinalizer( proverID: "", lastPendingFlushID: 0, pendingFlushIDCond: sync.NewCond(&sync.Mutex{}), + streamServer: streamServer, } f.reprocessFullBatchError.Store(false) @@ -1020,7 +1024,34 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta } f.handleForcedTxsProcessResp(ctx, request, response, stateRoot) + } else { + if f.lastGERHash != forcedBatch.GlobalExitRoot && f.streamServer != nil { + updateGer := state.DSUpdateGER{ + BatchNumber: request.BatchNumber, + Timestamp: request.Timestamp.Unix(), + GlobalExitRoot: request.GlobalExitRoot, + Coinbase: f.sequencerAddress, + ForkID: uint16(f.dbManager.GetForkIDByBatchNumber(request.BatchNumber)), + StateRoot: response.NewStateRoot, + } + + err = f.streamServer.StartAtomicOp() + if err != nil { + log.Errorf("failed to start atomic op for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err) + } + + _, err = f.streamServer.AddStreamEntry(state.EntryTypeUpdateGER, updateGer.Encode()) + if err != nil { + log.Errorf("failed to add stream entry for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err) + } + + err = f.streamServer.CommitAtomicOp() + if err != nil { + log.Errorf("failed to commit atomic op for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err) + } + } } + f.nextGERMux.Lock() f.lastGERHash = forcedBatch.GlobalExitRoot f.nextGERMux.Unlock() diff --git a/sequencer/finalizer_test.go b/sequencer/finalizer_test.go index f7a6c1ca81..576271f3f8 100644 --- a/sequencer/finalizer_test.go +++ b/sequencer/finalizer_test.go @@ -124,7 +124,7 @@ func TestNewFinalizer(t *testing.T) { dbManagerMock.On("GetLastSentFlushID", context.Background()).Return(uint64(0), nil) // arrange and act - f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog) + f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog, nil) // assert assert.NotNil(t, f) diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index 7e56cdc2f6..4a5bf02918 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -129,7 +129,12 @@ func (s *Sequencer) Start(ctx context.Context) { go dbManager.Start() - finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog) + var streamServer *datastreamer.StreamServer = nil + if s.cfg.StreamServer.Enabled { + streamServer = dbManager.streamServer + } + + finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog, streamServer) currBatch, processingReq := s.bootstrap(ctx, dbManager, finalizer) go finalizer.Start(ctx, currBatch, processingReq)