Skip to content

Commit

Permalink
feat(statesync): add state sync retries limit
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Jan 27, 2025
1 parent 34aa00b commit c22edb6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 8 deletions.
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,10 @@ type StateSyncConfig struct {
// Time to spend discovering snapshots before initiating a restore.
DiscoveryTime time.Duration `mapstructure:"discovery-time"`

// Number of times to retry state sync. When retries are exhausted, the node will
// fall back to the regular block sync. Set to 0 to disable retries. Default is 1.
Retries int `mapstructure:"retries"`

// Temporary directory for state sync snapshot chunks, defaults to os.TempDir().
// The synchronizer will create a new, randomly named directory within this directory
// and remove it when the sync is complete.
Expand All @@ -1022,6 +1026,7 @@ func DefaultStateSyncConfig() *StateSyncConfig {
DiscoveryTime: 15 * time.Second,
ChunkRequestTimeout: 15 * time.Second,
Fetchers: 4,
Retries: 1,
}
}

Expand Down Expand Up @@ -1054,6 +1059,10 @@ func (cfg *StateSyncConfig) ValidateBasic() error {
return errors.New("discovery time must be 0s or greater than five seconds")
}

if cfg.Retries < 0 {
return errors.New("retries must be greater than or equal to zero")
}

if cfg.ChunkRequestTimeout < 5*time.Second {
return errors.New("chunk-request-timeout must be at least 5 seconds")
}
Expand Down
4 changes: 4 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ rpc-servers = "{{ StringsJoin .StateSync.RPCServers "," }}"
# Time to spend discovering snapshots before initiating a restore.
discovery-time = "{{ .StateSync.DiscoveryTime }}"
# Number of times to retry state sync. When retries are exhausted, the node will
# fall back to the regular block sync. Set to 0 to disable retries. Default is 1.
retries = {{ .StateSync.Retries }}
# Temporary directory for state sync snapshot chunks, defaults to os.TempDir().
# The synchronizer will create a new, randomly named directory within this directory
# and remove it when the sync is complete.
Expand Down
2 changes: 1 addition & 1 deletion internal/statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
}
r.getSyncer().SetStateProvider(r.stateProvider)

state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.requestSnaphot)
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.cfg.Retries, r.requestSnaphot)
if err != nil {
return sm.State{}, fmt.Errorf("sync any: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (s *syncer) RemovePeer(peerID types.NodeID) {
func (s *syncer) SyncAny(
ctx context.Context,
discoveryTime time.Duration,
retries int,
requestSnapshots func() error,
) (sm.State, *types.Commit, error) {
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
Expand All @@ -170,6 +171,10 @@ func (s *syncer) SyncAny(

for {
iters++

if retries > 0 && iters > retries {
return sm.State{}, nil, errNoSnapshots
}
// If not nil, we're going to retry restoration of the same snapshot.
if snapshot == nil {
snapshot = s.snapshots.Best()
Expand Down
14 changes: 7 additions & 7 deletions internal/statesync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (suite *SyncerTestSuite) TestSyncAny() {
LastBlockAppHash: []byte("app_hash"),
}, nil)

newState, lastCommit, err := suite.syncer.SyncAny(ctx, 0, func() error { return nil })
newState, lastCommit, err := suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().NoError(err)

suite.Require().Equal([]int{0: 2, 1: 1, 2: 1, 3: 1}, chunkRequests)
Expand All @@ -294,7 +294,7 @@ func (suite *SyncerTestSuite) TestSyncAnyNoSnapshots() {
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

_, _, err := suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err := suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().Equal(errNoSnapshots, err)
}

Expand All @@ -320,7 +320,7 @@ func (suite *SyncerTestSuite) TestSyncAnyAbort() {
Once().
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, err = suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err = suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().Equal(errAbort, err)
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func (suite *SyncerTestSuite) TestSyncAnyReject() {
Once().
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, err = suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err = suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().Equal(errNoSnapshots, err)
}

Expand Down Expand Up @@ -413,7 +413,7 @@ func (suite *SyncerTestSuite) TestSyncAnyRejectFormat() {
Once().
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, err = suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err = suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().Equal(errAbort, err)
}

Expand Down Expand Up @@ -467,7 +467,7 @@ func (suite *SyncerTestSuite) TestSyncAnyRejectSender() {
Once().
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, err := suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err := suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().Equal(errNoSnapshots, err)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ func (suite *SyncerTestSuite) TestSyncAnyAbciError() {
Once().
Return(nil, errBoom)

_, _, err = suite.syncer.SyncAny(ctx, 0, func() error { return nil })
_, _, err = suite.syncer.SyncAny(ctx, 0, 0, func() error { return nil })
suite.Require().True(errors.Is(err, errBoom))
}

Expand Down

0 comments on commit c22edb6

Please sign in to comment.