Skip to content

Commit

Permalink
Fix isSynced function (#3188)
Browse files Browse the repository at this point in the history
* fix isSynced

* fix linter

* fix logs

* halt on sanity check error
  • Loading branch information
agnusmor committed Feb 2, 2024
1 parent 9f6ba6a commit 00cdc35
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 34 deletions.
2 changes: 2 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
EventID_SynchronizerRestart EventID = "SYNCHRONIZER RESTART"
// EventID_SynchronizerHalt is triggered when the synchronizer halts
EventID_SynchronizerHalt EventID = "SYNCHRONIZER HALT"
// EventID_SequenceSenderHalt is triggered when the SequenceSender halts
EventID_SequenceSenderHalt EventID = "SEQUENCESENDER HALT"
// Source_Node is the source of the event
Source_Node Source = "node"

Expand Down
1 change: 0 additions & 1 deletion sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {

metrics.WorkerProcessingTime(time.Since(start))
if tx != nil {
log.Debugf("processing tx %s", tx.HashStr)
showNotFoundTxLog = true

firstTxProcess := true
Expand Down
1 change: 1 addition & 0 deletions sequencesender/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type stateInterface interface {
GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*state.ForcedBatch, error)
GetTimeForLatestBatchVirtualization(ctx context.Context, dbTx pgx.Tx) (time.Time, error)
GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
GetLastClosedBatch(ctx context.Context, dbTx pgx.Tx) (*state.Batch, error)
GetL2BlocksByBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) ([]state.L2Block, error)
GetBlockByNumber(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) (*state.Block, error)
}
Expand Down
82 changes: 50 additions & 32 deletions sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ func New(cfg Config, state stateInterface, etherman etherman, manager ethTxManag

// Start starts the sequence sender
func (s *SequenceSender) Start(ctx context.Context) {
ticker := time.NewTicker(s.cfg.WaitPeriodSendSequence.Duration)
for {
s.tryToSendSequence(ctx, ticker)
s.tryToSendSequence(ctx)
}
}

Expand Down Expand Up @@ -83,7 +82,7 @@ func (s *SequenceSender) marginTimeElapsed(ctx context.Context, l2BlockTimestamp
}
}

func (s *SequenceSender) tryToSendSequence(ctx context.Context, ticker *time.Ticker) {
func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
retry := false
// process monitored sequences before starting a next cycle
s.ethTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
Expand All @@ -100,8 +99,8 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context, ticker *time.Tic

// Check if synchronizer is up to date
if !s.isSynced(ctx) {
log.Info("wait for synchronizer to sync last batch")
waitTick(ctx, ticker)
log.Info("wait virtual state to be synced...")
time.Sleep(5 * time.Second) // nolint:gomnd
return
}

Expand All @@ -114,7 +113,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context, ticker *time.Tic
} else {
log.Info("waiting for sequences to be worth sending to L1")
}
waitTick(ctx, ticker)
time.Sleep(s.cfg.WaitPeriodSendSequence.Duration)
return
}

Expand Down Expand Up @@ -258,14 +257,11 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequen
}

seq := types.Sequence{
GlobalExitRoot: batch.GlobalExitRoot, //TODO: set empty for regular batches
Timestamp: batch.Timestamp.Unix(), //TODO: set empty for regular batches
BatchL2Data: batch.BatchL2Data,
BatchNumber: batch.BatchNumber,
BatchL2Data: batch.BatchL2Data,
BatchNumber: batch.BatchNumber,
}

if batch.ForcedBatchNum != nil {
//TODO: Assign GER, timestamp(forcedAt) and l1block.parentHash to seq
forcedBatch, err := s.state.GetForcedBatch(ctx, *batch.ForcedBatchNum, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -407,38 +403,60 @@ func isDataForEthTxTooBig(err error) bool {
errors.Is(err, ethman.ErrContentLengthTooLarge)
}

func waitTick(ctx context.Context, ticker *time.Ticker) {
select {
case <-ticker.C:
// nothing
case <-ctx.Done():
return
}
}

func (s *SequenceSender) isSynced(ctx context.Context) bool {
lastSyncedBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Errorf("failed to get last isSynced batch, err: %v", err)
log.Warnf("failed to get last virtual batch number, err: %v", err)
return false
}
lastBatchNum, err := s.state.GetLastBatchNumber(ctx, nil)

lastTrustedBatchClosed, err := s.state.GetLastClosedBatch(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Errorf("failed to get last batch num, err: %v", err)
log.Warnf("failed to get last trusted batch closed, err: %v", err)
return false
}
if lastBatchNum > lastSyncedBatchNum {
return true
}
lastEthBatchNum, err := s.etherman.GetLatestBatchNumber()

lastSCBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Errorf("failed to get last eth batch, err: %v", err)
log.Warnf("failed to get from the SC last sequenced batch number, err: %v", err)
return false
}

if lastVirtualBatchNum < lastSCBatchNum {
log.Infof("waiting for the state to be synced, last virtual batch: %d, last SC sequenced batch: %d", lastVirtualBatchNum, lastSCBatchNum)
return false
} else if lastVirtualBatchNum > lastSCBatchNum { // Sanity check: virtual batch number cannot be greater than last batch sequenced in the SC
s.halt(ctx, fmt.Errorf("last virtual batch %d is greater than last SC sequenced batch %d", lastVirtualBatchNum, lastSCBatchNum))
return false
}
if lastSyncedBatchNum < lastEthBatchNum {
log.Infof("waiting for the state to be isSynced, lastSyncedBatchNum: %d, lastEthBatchNum: %d", lastSyncedBatchNum, lastEthBatchNum)

// At this point lastVirtualBatchNum = lastEthBatchNum. Check trusted batches
if lastTrustedBatchClosed.BatchNumber >= lastVirtualBatchNum {
return true
} else { // Sanity check: virtual batch number cannot be greater than last trusted batch closed
s.halt(ctx, fmt.Errorf("last virtual batch %d is greater than last trusted batch closed %d", lastVirtualBatchNum, lastTrustedBatchClosed.BatchNumber))
return false
}
}

return true
// halt halts the SequenceSender
func (s *SequenceSender) halt(ctx context.Context, err error) {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequence_Sender,
Level: event.Level_Critical,
EventID: event.EventID_FinalizerHalt,
Description: fmt.Sprintf("SequenceSender halted due to error, error: %s", err),
}

eventErr := s.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing SequenceSender halt event, error: %v", eventErr)
}

log.Errorf("halting SequenceSender, fatal error: %v", err)
for {
time.Sleep(300 * time.Second) //nolint:gomnd
}
}
2 changes: 1 addition & 1 deletion state/pgstatestorage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func (p *PostgresStorage) GetSequences(ctx context.Context, lastVerifiedBatchNum
// GetLastClosedBatch returns the latest closed batch
func (p *PostgresStorage) GetLastClosedBatch(ctx context.Context, dbTx pgx.Tx) (*state.Batch, error) {
const getLastClosedBatchSQL = `
SELECT bt.batch_num, bt.global_exit_root, bt.local_exit_root, bt.acc_input_hash, bt.state_root, bt.timestamp, bt.coinbase, bt.raw_txs_data, bt.batch_resources, bt.wip
SELECT bt.batch_num, bt.global_exit_root, bt.local_exit_root, bt.acc_input_hash, bt.state_root, bt.timestamp, bt.coinbase, bt.raw_txs_data, bt.forced_batch_num, bt.batch_resources, bt.wip
FROM state.batch bt
WHERE wip = FALSE
ORDER BY bt.batch_num DESC
Expand Down

0 comments on commit 00cdc35

Please sign in to comment.