Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: kvserver: increase Migrate application timeout to 1 minute #73061

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ import (
"go.etcd.io/etcd/raft/v3/tracker"
)

// mergeApplicationTimeout is the timeout when waiting for a merge command to be
// applied on all range replicas. There doesn't appear to be any strong reason
// why this value was chosen in particular, but it seems to work.
const mergeApplicationTimeout = 5 * time.Second

// AdminSplit divides the range into into two ranges using args.SplitKey.
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest, reason string,
Expand Down Expand Up @@ -727,9 +732,11 @@ func (r *Replica) AdminMerge(
}
rhsSnapshotRes := br.(*roachpb.SubsumeResponse)

err = waitForApplication(
ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, mergeReplicas,
rhsSnapshotRes.LeaseAppliedIndex)
err = contextutil.RunWithTimeout(ctx, "waiting for merge application", mergeApplicationTimeout,
func(ctx context.Context) error {
return waitForApplication(ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, mergeReplicas,
rhsSnapshotRes.LeaseAppliedIndex)
})
if err != nil {
return errors.Wrap(err, "waiting for all right-hand replicas to catch up")
}
Expand Down Expand Up @@ -794,25 +801,23 @@ func waitForApplication(
replicas []roachpb.ReplicaDescriptor,
leaseIndex uint64,
) error {
return contextutil.RunWithTimeout(ctx, "wait for application", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
repl := repl // copy for goroutine
g.GoCtx(func(ctx context.Context) error {
conn, err := dialer.Dial(ctx, repl.NodeID, rpc.DefaultClass)
if err != nil {
return errors.Wrapf(err, "could not dial n%d", repl.NodeID)
}
_, err = NewPerReplicaClient(conn).WaitForApplication(ctx, &WaitForApplicationRequest{
StoreRequestHeader: StoreRequestHeader{NodeID: repl.NodeID, StoreID: repl.StoreID},
RangeID: rangeID,
LeaseIndex: leaseIndex,
})
return err
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
repl := repl // copy for goroutine
g.GoCtx(func(ctx context.Context) error {
conn, err := dialer.Dial(ctx, repl.NodeID, rpc.DefaultClass)
if err != nil {
return errors.Wrapf(err, "could not dial n%d", repl.NodeID)
}
_, err = NewPerReplicaClient(conn).WaitForApplication(ctx, &WaitForApplicationRequest{
StoreRequestHeader: StoreRequestHeader{NodeID: repl.NodeID, StoreID: repl.StoreID},
RangeID: rangeID,
LeaseIndex: leaseIndex,
})
}
return g.Wait()
})
return err
})
}
return g.Wait()
}

// waitForReplicasInit blocks until it has proof that the replicas listed in
Expand Down
28 changes: 22 additions & 6 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/observedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -37,6 +38,18 @@ import (
"go.etcd.io/etcd/raft/v3"
)

// migrateApplicationTimeout is the duration to wait for a Migrate command
// to be applied to all replicas.
//
// TODO(erikgrinaker): this, and the timeout handling, should be moved into a
// migration helper that manages checkpointing and retries as well.
var migrateApplicationTimeout = settings.RegisterDurationSetting(
"kv.migration.migrate_application.timeout",
"timeout for a Migrate request to be applied across all replicas of a range",
1*time.Minute,
settings.PositiveDuration,
)

// executeWriteBatch is the entry point for client requests which may mutate the
// range's replicated state. Requests taking this path are evaluated and ultimately
// serialized through Raft, but pass through additional machinery whose goal is
Expand Down Expand Up @@ -249,12 +262,15 @@ func (r *Replica) executeWriteBatch(
//
// [1]: See PurgeOutdatedReplicas from the Migration service.
// [2]: pkg/migration
desc := r.Desc()
// NB: waitForApplication already has a timeout.
applicationErr := waitForApplication(
ctx, r.store.cfg.NodeDialer, desc.RangeID, desc.Replicas().Descriptors(),
// We wait for an index >= that of the migration command.
r.GetLeaseAppliedIndex())
applicationErr := contextutil.RunWithTimeout(ctx, "wait for Migrate application",
migrateApplicationTimeout.Get(&r.ClusterSettings().SV),
func(ctx context.Context) error {
desc := r.Desc()
return waitForApplication(
ctx, r.store.cfg.NodeDialer, desc.RangeID, desc.Replicas().Descriptors(),
// We wait for an index >= that of the migration command.
r.GetLeaseAppliedIndex())
})
propResult.Err = roachpb.NewError(applicationErr)
}
return propResult.Reply, nil, propResult.Err
Expand Down
12 changes: 12 additions & 0 deletions pkg/migration/migrations/separated_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ func postSeparatedIntentsMigration(
return err
})
if err != nil {
// TODO(erikgrinaker): This should be moved into a common helper for
// all migrations that e.g. manages checkpointing and retries as well.
// We add a message here for 21.2 temporarily, since this has been seen
// to fail in the wild.
log.Warningf(ctx, `Migrate command failed for range %d: %s.
Command must be applied on all range replicas (not just a quorum). Please make
sure all ranges are healthy and fully upreplicated. Heavy rebalancing may
interfere with the migration, consider temporarily disabling rebalancing with
the cluster setting kv.allocator.load_based_rebalancing=1 and
kv.allocator.range_rebalance_threshold=1. The timeout can also be increased
with kv.migration.migrate_application.timeout.`,
desc.RangeID, err)
return err
}
}
Expand Down