Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: simplify scatter implementation #17644

Merged
merged 1 commit into from
Aug 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 25 additions & 122 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -3974,139 +3973,43 @@ func TestingRelocateRange(
func (r *Replica) adminScatter(
ctx context.Context, args roachpb.AdminScatterRequest,
) (roachpb.AdminScatterResponse, error) {
var desc *roachpb.RangeDescriptor
var zone config.ZoneConfig
var err error

refreshDescAndZone := func() error {
desc = r.Desc()

sysCfg, ok := r.store.cfg.Gossip.GetSystemConfig()
if !ok {
return errors.New("system config not yet available")
}
if zone, err = sysCfg.GetZoneConfigForKey(desc.StartKey); err != nil {
return err
}

return nil
sysCfg, ok := r.store.cfg.Gossip.GetSystemConfig()
if !ok {
log.Infof(ctx, "scatter failed (system config not yet available)")
return roachpb.AdminScatterResponse{}, errors.New("system config not yet available")
}

if err := refreshDescAndZone(); err != nil {
return roachpb.AdminScatterResponse{}, err
rq := r.store.replicateQueue
retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
Multiplier: 2,
MaxRetries: 5,
}

// Step 1. Rebalance by adding replicas of this range to the stores the
// allocator recommends, if any. It's unlikely that the allocator would
// suggest more than zone.NumReplicas rebalance targets--that would indicate
// the allocator had previously given us suggestions that did not balance the
// cluster--but we cap the number of replicas we'll try to add at
// zone.NumReplicas just in case.
//
// TODO(benesch): This causes overreplication. Ideally, we'd wait for the
// replicate queue to downreplicate after each ADD_REPLICA command, but this
// practically guarantees that, for at least some ranges, we'll remove our own
// replica first, after which we can no longer issue ADD_REPLICA commands.
for i := int32(0); i < zone.NumReplicas; i++ {
if err = refreshDescAndZone(); err != nil {
break
}

rangeInfo := rangeInfoForRepl(r, desc)
targetStore := r.store.allocator.RebalanceTarget(
ctx, zone.Constraints, rangeInfo, storeFilterNone)
if targetStore == nil {
if log.V(2) {
log.Infof(ctx, "scatter: no rebalance targets found on try %d, moving on", i)
}
break
} else if log.V(2) {
log.Infof(ctx, "scatter: found rebalance target %d: %v", i, targetStore)
}
replicationTarget := roachpb.ReplicationTarget{
NodeID: targetStore.Node.NodeID,
StoreID: targetStore.StoreID,
}

retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxRetries: 5,
RandomizationFactor: .3,
}
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
if err = r.changeReplicas(
ctx, roachpb.ADD_REPLICA, replicationTarget, desc, SnapshotRequest_REBALANCE,
); err == nil {
break
} else if log.V(2) {
log.Infof(ctx, "scatter: unable to replicate to %v: %s", replicationTarget, err)
}
}
// Loop until the replicate queue decides there is nothing left to do for the
// range. Note that we disable lease transfers until the final step as
// transferring the lease prevents any further action on this node.
var allowLeaseTransfer bool
canTransferLease := func() bool { return allowLeaseTransfer }
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
requeue, err := rq.processOneChange(ctx, r, sysCfg, canTransferLease)
if err != nil {
switch errors.Cause(err).(type) {
case *roachpb.ConditionFailedError:
default:
return roachpb.AdminScatterResponse{}, err
if IsSnapshotError(err) {
continue
}
}
if ctx.Err() != nil {
return roachpb.AdminScatterResponse{}, ctx.Err()
}
}

// Step 2. Transfer our lease away, if the allocator wants us to.
retryOpts := retry.Options{
InitialBackoff: 50 * time.Millisecond,
MaxBackoff: time.Second,
MaxRetries: 5,
RandomizationFactor: .3,
}
for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); {
lease, _ := r.getLease()
if !r.IsLeaseValid(lease, r.store.Clock().Now()) {
// We assume that, if we no longer have the lease, the replicate queue has
// already transferred it away to balance the cluster, so we move on.
break
}

if err = refreshDescAndZone(); err != nil {
continue
}

candidates := filterBehindReplicas(r.RaftStatus(), desc.Replicas)
target := r.store.allocator.TransferLeaseTarget(
ctx,
zone.Constraints,
candidates,
r.store.StoreID(),
desc.RangeID,
r.leaseholderStats,
true, /* checkTransferLeaseSource */
true, /* checkCandidateFullness */
true, /* alwaysAllowDecisionWithoutStats */
)

if target == (roachpb.ReplicaDescriptor{}) {
if log.V(2) {
log.Infof(ctx, "scatter: no lease transfer targets found, moving on")
if !requeue {
if allowLeaseTransfer {
break
}
r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now())
break
} else if log.V(2) {
log.Infof(ctx, "scatter: attempting to transfer lease to s%d", target.StoreID)
allowLeaseTransfer = true
}

if err = r.AdminTransferLease(ctx, target.StoreID); err != nil && log.V(2) {
log.Infof(ctx, "scatter: unable to transfer lease to s%d: %s", target.StoreID, err)
}
}
if err != nil {
return roachpb.AdminScatterResponse{}, err
}
if ctx.Err() != nil {
return roachpb.AdminScatterResponse{}, ctx.Err()
re.Reset()
}

desc := r.Desc()
return roachpb.AdminScatterResponse{
Ranges: []roachpb.AdminScatterResponse_Range{{
Span: roachpb.Span{
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (rq *replicateQueue) process(
// snapshot errors, usually signalling that a rebalancing
// reservation could not be made with the selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
if requeue, err := rq.processOneChange(ctx, repl, sysCfg); err != nil {
if requeue, err := rq.processOneChange(ctx, repl, sysCfg, rq.canTransferLease); err != nil {
if IsSnapshotError(err) {
// If ChangeReplicas failed because the preemptive snapshot failed, we
// log the error but then return success indicating we should retry the
Expand All @@ -242,7 +242,7 @@ func (rq *replicateQueue) process(
}

func (rq *replicateQueue) processOneChange(
ctx context.Context, repl *Replica, sysCfg config.SystemConfig,
ctx context.Context, repl *Replica, sysCfg config.SystemConfig, canTransferLease func() bool,
) (requeue bool, _ error) {
desc := repl.Desc()

Expand Down Expand Up @@ -449,7 +449,7 @@ func (rq *replicateQueue) processOneChange(
log.Infof(ctx, "considering a rebalance")
}

if rq.canTransferLease() {
if canTransferLease() {
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
transferred, err := rq.transferLease(
Expand Down Expand Up @@ -490,6 +490,7 @@ func (rq *replicateQueue) processOneChange(
ctx, repl, rebalanceReplica, desc, SnapshotRequest_REBALANCE); err != nil {
return false, err
}
return true, nil
}
}

Expand Down