Skip to content

Commit

Permalink
Merge pull request #412 from NilFoundation/consensus-future-messages
Browse files Browse the repository at this point in the history
[consensus] handle messages from future heights
  • Loading branch information
olegrok authored Feb 26, 2025
2 parents 78a4f32 + 66e1b88 commit 4000d87
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 21 deletions.
2 changes: 1 addition & 1 deletion nil/internal/collate/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func fetchShardSnap(ctx context.Context, nm *network.Manager, peerId network.Pee
}
defer stream.Close()

logger.Info().Msgf("Start to fetch data snapshot from %s", peerId)
// TODO: Here we need to check signatures of fetched blocks, this would require checking every block in the stream, which can be slow.
if err := db.Fetch(ctx, stream); err != nil {
logger.Error().Err(err).Msgf("Failed to fetch snapshot from %s", peerId)
Expand All @@ -84,6 +85,5 @@ func fetchSnapshot(ctx context.Context, nm *network.Manager, peerAddr *network.A
logger.Error().Err(err).Msgf("Failed to connect to %s to fetch snapshot", peerAddr)
return err
}
logger.Info().Msgf("Start to fetch data snapshot from %s", peerAddr)
return fetchShardSnap(ctx, nm, peerId, db, logger)
}
30 changes: 21 additions & 9 deletions nil/internal/collate/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,31 @@ func (s *Scheduler) doCollate(ctx context.Context) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()

height := block.Id.Uint64() + 1
consCh := make(chan error, 1)
go func() {
consCh <- s.consensus.RunSequence(ctx, block.Id.Uint64()+1)
consCh <- s.consensus.RunSequence(ctx, height)
}()

select {
case <-syncCh:
cancelFn()
err := <-consCh
s.logger.Debug().Err(err).Msg("Consensus interrupted by syncer")
return nil
case err := <-consCh:
return err
for {
select {
case newBlockId := <-syncCh:
if newBlockId < types.BlockNumber(height) {
continue
}

// We receive new block via syncer.
// We need to interrupt current sequence and start new one.
cancelFn()
err := <-consCh
s.logger.Debug().
Uint64(logging.FieldBlockNumber, height).
Err(err).
Msg("Consensus interrupted by syncer")
return nil
case err := <-consCh:
return err
}
}
}
}
18 changes: 9 additions & 9 deletions nil/internal/collate/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Syncer struct {

subsMutex sync.Mutex
subsId uint64
subs map[uint64]chan struct{}
subs map[uint64]chan types.BlockNumber
validator *Validator
}

Expand All @@ -64,7 +64,7 @@ func NewSyncer(cfg *SyncerConfig, validator *Validator, db db.DB, networkManager
Stringer(logging.FieldShardId, cfg.ShardId).
Logger(),
waitForSync: &waitForSync,
subs: make(map[uint64]chan struct{}),
subs: make(map[uint64]chan types.BlockNumber),
validator: validator,
}, nil
}
Expand All @@ -81,11 +81,11 @@ func (s *Syncer) WaitComplete() {
s.waitForSync.Wait()
}

func (s *Syncer) Subscribe() (uint64, <-chan struct{}) {
func (s *Syncer) Subscribe() (uint64, <-chan types.BlockNumber) {
s.subsMutex.Lock()
defer s.subsMutex.Unlock()

ch := make(chan struct{}, 1)
ch := make(chan types.BlockNumber, 1)
id := s.subsId
s.subs[id] = ch
s.subsId++
Expand All @@ -100,12 +100,12 @@ func (s *Syncer) Unsubscribe(id uint64) {
delete(s.subs, id)
}

func (s *Syncer) notify() {
func (s *Syncer) notify(blockId types.BlockNumber) {
s.subsMutex.Lock()
defer s.subsMutex.Unlock()

for _, ch := range s.subs {
ch <- struct{}{}
ch <- blockId
}
}

Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *Syncer) Run(ctx context.Context) error {
s.logger.Debug().
Stringer(logging.FieldBlockHash, hash).
Uint64(logging.FieldBlockNumber, uint64(block.Id)).
Msg("Initialized sync proposer at starting block")
Msg("Initialized syncer at starting block")

s.logger.Info().Msg("Starting sync")

Expand All @@ -162,7 +162,7 @@ func (s *Syncer) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
s.logger.Debug().Msg("Sync proposer is terminated")
s.logger.Debug().Msg("Syncer is terminated")
return nil
case data := <-ch:
saved, err := s.processTopicTransaction(ctx, data)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *Syncer) saveBlock(ctx context.Context, block *types.BlockWithExtractedD
if err := s.validator.ReplayBlock(ctx, block); err != nil {
return err
}
s.notify()
s.notify(block.Id)

s.logger.Trace().
Stringer(logging.FieldBlockNumber, block.Block.Id).
Expand Down
24 changes: 22 additions & 2 deletions nil/internal/consensus/ibft/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,36 @@ func (i *backendIBFT) IsValidValidator(msg *protoIBFT.IbftMessage) bool {
return false
}

// Here (and below) we use transportCtx because this method could be called from the transport goroutine
// or i.ctx can be changed in case we start new sequence for the next height.
lastBlock, _, err := i.validator.GetLastBlock(i.transportCtx)
if err != nil {
i.logger.Error().
Err(err).
Msg("Failed to get last block")
return false
}

var height uint64
loggerCtx := i.logger.With().Hex(logging.FieldPublicKey, msg.From)
if view := msg.GetView(); view != nil {
loggerCtx = loggerCtx.
Uint64(logging.FieldHeight, view.Height).
Uint64(logging.FieldRound, view.Round)
height = view.Height
}
logger := loggerCtx.Logger()

// Here we use transportCtx because this method could be called from the transport goroutine
validators, err := i.validatorsCache.getValidators(i.transportCtx, msg.View.Height)
// Current message is from future.
// Some validator could commit block and start new sequence before we committed that block.
// Use last known config since validators list is static for now.
// TODO: consider some options to fix it.
if expectedHeight := uint64(lastBlock.Id + 1); height > expectedHeight {
logger.Warn().Msgf("Got message with height=%d while expected=%d", height, expectedHeight)
height = expectedHeight
}

validators, err := i.validatorsCache.getValidators(i.transportCtx, height)
if err != nil {
logger.Error().
Err(err).
Expand Down

0 comments on commit 4000d87

Please sign in to comment.