Skip to content

Commit

Permalink
Properly handle sequenced empty batches with GER Update (#2742)
Browse files Browse the repository at this point in the history
* handle GER update on empty forced batch

* fix update current GER

* full refactor

* full refactor
  • Loading branch information
ToniRamirezM authored Nov 2, 2023
1 parent f28cc36 commit fda7f88
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
13 changes: 8 additions & 5 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func (d *dbManager) sendDataToStreamer() {

_, err = d.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode())
if err != nil {
log.Fatal(err)
log.Errorf("failed to add stream entry for l2block %v: %v", l2Block.L2BlockNumber, err)
continue
}

err = d.streamServer.CommitAtomicOp()
Expand Down Expand Up @@ -307,10 +308,12 @@ func (d *dbManager) StoreProcessedTxAndDeleteFromPool(ctx context.Context, tx tr
return err
}

// Change Tx status to selected
err = d.txPool.UpdateTxStatus(ctx, tx.response.TxHash, pool.TxStatusSelected, false, nil)
if err != nil {
return err
if !tx.isForcedBatch {
// Change Tx status to selected
err = d.txPool.UpdateTxStatus(ctx, tx.response.TxHash, pool.TxStatusSelected, false, nil)
if err != nil {
return err
}
}

log.Infof("StoreProcessedTxAndDeleteFromPool: successfully stored tx: %v for batch: %v", tx.response.TxHash.String(), tx.batchNumber)
Expand Down
31 changes: 31 additions & 0 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/log"
Expand Down Expand Up @@ -70,6 +71,7 @@ type finalizer struct {
proverID string
lastPendingFlushID uint64
pendingFlushIDCond *sync.Cond
streamServer *datastreamer.StreamServer
}

type transactionToStore struct {
Expand Down Expand Up @@ -116,6 +118,7 @@ func newFinalizer(
closingSignalCh ClosingSignalCh,
batchConstraints batchConstraints,
eventLog *event.EventLog,
streamServer *datastreamer.StreamServer,
) *finalizer {
f := finalizer{
cfg: cfg,
Expand Down Expand Up @@ -150,6 +153,7 @@ func newFinalizer(
proverID: "",
lastPendingFlushID: 0,
pendingFlushIDCond: sync.NewCond(&sync.Mutex{}),
streamServer: streamServer,
}

f.reprocessFullBatchError.Store(false)
Expand Down Expand Up @@ -1020,7 +1024,34 @@ func (f *finalizer) processForcedBatch(ctx context.Context, lastBatchNumberInSta
}

f.handleForcedTxsProcessResp(ctx, request, response, stateRoot)
} else {
if f.lastGERHash != forcedBatch.GlobalExitRoot && f.streamServer != nil {
updateGer := state.DSUpdateGER{
BatchNumber: request.BatchNumber,
Timestamp: request.Timestamp.Unix(),
GlobalExitRoot: request.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(f.dbManager.GetForkIDByBatchNumber(request.BatchNumber)),
StateRoot: response.NewStateRoot,
}

err = f.streamServer.StartAtomicOp()
if err != nil {
log.Errorf("failed to start atomic op for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err)
}

_, err = f.streamServer.AddStreamEntry(state.EntryTypeUpdateGER, updateGer.Encode())
if err != nil {
log.Errorf("failed to add stream entry for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err)
}

err = f.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for forced batch %v: %v", forcedBatch.ForcedBatchNumber, err)
}
}
}

f.nextGERMux.Lock()
f.lastGERHash = forcedBatch.GlobalExitRoot
f.nextGERMux.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestNewFinalizer(t *testing.T) {
dbManagerMock.On("GetLastSentFlushID", context.Background()).Return(uint64(0), nil)

// arrange and act
f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog)
f = newFinalizer(cfg, effectiveGasPriceCfg, workerMock, dbManagerMock, executorMock, seqAddr, isSynced, closingSignalCh, bc, eventLog, nil)

// assert
assert.NotNil(t, f)
Expand Down
7 changes: 6 additions & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ func (s *Sequencer) Start(ctx context.Context) {

go dbManager.Start()

finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog)
var streamServer *datastreamer.StreamServer = nil
if s.cfg.StreamServer.Enabled {
streamServer = dbManager.streamServer
}

finalizer := newFinalizer(s.cfg.Finalizer, s.cfg.EffectiveGasPrice, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, batchConstraints, s.eventLog, streamServer)
currBatch, processingReq := s.bootstrap(ctx, dbManager, finalizer)
go finalizer.Start(ctx, currBatch, processingReq)

Expand Down

0 comments on commit fda7f88

Please sign in to comment.