Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: single-node quorum type 111 #1008

Open
wants to merge 5 commits into
base: v1.5-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(config): add initial sync timeout setting
  • Loading branch information
lklimek committed Dec 19, 2024
commit 61187185e4603fbf1559149be45de0561796f7ec
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ type BaseConfig struct { //nolint: maligned
// Default: 0
DeadlockDetection time.Duration `mapstructure:"deadlock-detection"`

// SyncTimeout is the timeout for the initial sync process, before switching to consensus.
// If zero or empty, the default value is used.
//
// Default: 60s
SyncTimeout time.Duration `mapstructure:"sync-timeout"`

// Other options should be empty
Other map[string]interface{} `mapstructure:",remain"`
}

Expand All @@ -250,6 +257,7 @@ func DefaultBaseConfig() BaseConfig {
DBBackend: "goleveldb",
DBPath: "data",
DeadlockDetection: 0,
SyncTimeout: 60 * time.Second,
}
}

Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func TestDefaultConfig(t *testing.T) {

assert.Equal(t, "/foo/bar", cfg.GenesisFile())
assert.Equal(t, "/opt/data", cfg.DBDir())

assert.Equal(t, 60*time.Second, cfg.BaseConfig.SyncTimeout)
}

func TestConfigValidateBasic(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ filter-peers = {{ .BaseConfig.FilterPeers }}
# Default: 0
deadlock-detection = "{{ .BaseConfig.DeadlockDetection }}"

# Timeout for the initial sync process, before switching to consensus.
# If zero or empty, the default value is used.
#
# Default: 60s
sync-timeout = "{{ .BaseConfig.SyncTimeout }}"

#######################################################
### ABCI App Connection Options ###
#######################################################
Expand Down
16 changes: 14 additions & 2 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
switchToConsensusIntervalSeconds = 1

// switch to consensus after this duration of inactivity
syncTimeout = 60 * time.Second
defaultSyncTimeout = 60 * time.Second
)

type consensusReactor interface {
Expand Down Expand Up @@ -62,6 +62,8 @@ type Reactor struct {
eventBus *eventbus.EventBus

syncStartTime time.Time
// syncTimeout defines how much time we will try to start sync before switching to consensus
syncTimeout time.Duration

nodeProTxHash types.ProTxHash

Expand Down Expand Up @@ -94,6 +96,7 @@ func NewReactor(
metrics: metrics,
eventBus: eventBus,
nodeProTxHash: nodeProTxHash,
syncTimeout: defaultSyncTimeout,
executor: newBlockApplier(
blockExec,
store,
Expand All @@ -106,6 +109,12 @@ func NewReactor(
return r
}

func (r *Reactor) WithSyncTimeout(timeout time.Duration) *Reactor {
r.syncTimeout = timeout

return r
}

// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
Expand All @@ -130,7 +139,10 @@ func (r *Reactor) OnStart(ctx context.Context) error {
startHeight = state.InitialHeight
}

r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor, WithLogger(r.logger))
r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor,
WithLogger(r.logger),
WithSyncTimeout(r.syncTimeout),
)
if r.blockSyncFlag.Load() {
if err := r.synchronizer.Start(ctx); err != nil {
return err
Expand Down
13 changes: 11 additions & 2 deletions internal/blocksync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type (
logger log.Logger

lastAdvance time.Time
// syncTimeout defines how much time we will try to sync; defaults to 60 seconds
syncTimeout time.Duration

mtx sync.RWMutex

Expand Down Expand Up @@ -112,6 +114,12 @@ func WithClock(clock clockwork.Clock) OptionFunc {
}
}

func WithSyncTimeout(timeout time.Duration) OptionFunc {
return func(v *Synchronizer) {
v.syncTimeout = timeout
}
}

// NewSynchronizer returns a new Synchronizer with the height equal to start
func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApplier, opts ...OptionFunc) *Synchronizer {
peerStore := NewInMemPeerStore()
Expand All @@ -127,6 +135,7 @@ func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApp
height: start,
workerPool: workerpool.New(poolWorkerSize, workerpool.WithLogger(logger)),
pendingToApply: map[int64]BlockResponse{},
syncTimeout: defaultSyncTimeout,
}
for _, opt := range opts {
opt(bp)
Expand Down Expand Up @@ -239,14 +248,14 @@ func (s *Synchronizer) WaitForSync(ctx context.Context) {
lastAdvance = s.LastAdvance()
isCaughtUp = s.IsCaughtUp()
)
if isCaughtUp || time.Since(lastAdvance) > syncTimeout {
if isCaughtUp || time.Since(lastAdvance) > s.syncTimeout {
return
}
s.logger.Info(
"not caught up yet",
"height", height,
"max_peer_height", s.MaxPeerHeight(),
"timeout_in", syncTimeout-time.Since(lastAdvance),
"timeout_in", s.syncTimeout-time.Since(lastAdvance),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func makeNode(
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
).WithSyncTimeout(cfg.SyncTimeout)
node.services = append(node.services, bcReactor)
node.rpcEnv.BlockSyncReactor = bcReactor

Expand Down
Loading