Skip to content

Commit

Permalink
storage: don't down-replicate the range holding range lease
Browse files Browse the repository at this point in the history
This is the preventative measure for stopping a down-replication of the range
lease holder's replica.  RemoveTarget will pick the worst replica from the list
that does not contain the lease holder's replica.

Part of #5737
  • Loading branch information
BramGruneir committed Jul 19, 2016
1 parent fa16747 commit 44cf1d8
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 29 deletions.
18 changes: 11 additions & 7 deletions storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,35 @@ func (a *Allocator) AllocateTarget(required roachpb.Attributes, existing []roach

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It attempts to consider which of the provided replicas would be the best
// candidate for removal.
// candidate for removal. It also will exclude any replica that belongs to the
// range lease holder's store ID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
// make correct decisions in the case of ranges with heterogeneous replica
// requirements (i.e. multiple data centers).
func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor) (roachpb.ReplicaDescriptor, error) {
func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID) (roachpb.ReplicaDescriptor, error) {
if len(existing) == 0 {
return roachpb.ReplicaDescriptor{}, errors.Errorf("must supply at least one replica to allocator.RemoveTarget()")
}

// Retrieve store descriptors for the provided replicas from the StorePool.
sl := StoreList{}
for i := range existing {
desc := a.storePool.getStoreDescriptor(existing[i].StoreID)
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
}
desc := a.storePool.getStoreDescriptor(exist.StoreID)
if desc == nil {
continue
}
sl.add(desc)
}

if bad := a.selectBad(sl); bad != nil {
for i := range existing {
if existing[i].StoreID == bad.StoreID {
return existing[i], nil
for _, exist := range existing {
if exist.StoreID == bad.StoreID {
return exist, nil
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ func TestAllocatorRemoveTarget(t *testing.T) {
},
}

// Setup the stores so that store 3 is the worst candidate.
// Setup the stores so that store 3 is the worst candidate and store 2 is
// the 2nd worst.
stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Expand All @@ -528,7 +529,7 @@ func TestAllocatorRemoveTarget(t *testing.T) {
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 100, Available: 80, RangeCount: 10},
Capacity: roachpb.StoreCapacity{Capacity: 100, Available: 65, RangeCount: 12},
},
{
StoreID: 3,
Expand All @@ -544,13 +545,23 @@ func TestAllocatorRemoveTarget(t *testing.T) {
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

targetRepl, err := a.RemoveTarget(replicas)
targetRepl, err := a.RemoveTarget(replicas, stores[0].StoreID)
if err != nil {
t.Fatal(err)
}
if a, e := targetRepl, replicas[2]; a != e {
t.Fatalf("RemoveTarget did not select expected replica; expected %v, got %v", e, a)
}

// Now perform the same test, but pass in the store ID of store 3 so it's
// excluded.
targetRepl, err = a.RemoveTarget(replicas, stores[2].StoreID)
if err != nil {
t.Fatal(err)
}
if a, e := targetRepl, replicas[1]; a != e {
t.Fatalf("RemoveTarget did not select expected replica; expected %v, got %v", e, a)
}
}

func TestAllocatorComputeAction(t *testing.T) {
Expand Down
33 changes: 21 additions & 12 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,12 @@ func TestStoreRangeDownReplicate(t *testing.T) {

maxTimeout := time.After(10 * time.Second)
succeeded := false
i := 0
for !succeeded {
select {
case <-maxTimeout:
t.Fatalf("Failed to achieve proper replication within 10 seconds")
case <-time.After(10 * time.Millisecond):
mtc.expireLeases()
rangeDesc := getRangeMetadata(rightKeyAddr, mtc, t)
if count := len(rangeDesc.Replicas); count < 3 {
t.Fatalf("Removed too many replicas; expected at least 3 replicas, found %d", count)
Expand All @@ -703,20 +703,29 @@ func TestStoreRangeDownReplicate(t *testing.T) {
break
}

// Run replication scans on every store; only the store with the
// range lease will actually do anything. If we did not wait
// for the scan to complete here it could be interrupted by the
// next call to expireLeases.
for _, store := range mtc.stores {
store.ForceReplicationScanAndProcess()
// Cycle the lease to the next replica (on the next store) if that
// replica still exists. This avoids the condition in which we try
// to continuously remove the replica on a store when
// down-replicating while it also still holds the lease.
for {
i++
if i >= len(mtc.stores) {
i = 0
}
rep := mtc.stores[i].LookupReplica(rightKeyAddr, nil)
if rep != nil {
mtc.expireLeases()
// Force the read command request a new lease.
getArgs := getArgs(rightKey)
if _, err := client.SendWrapped(mtc.distSenders[i], nil, &getArgs); err != nil {
t.Fatal(err)
}
mtc.stores[i].ForceReplicationScanAndProcess()
break
}
}
}
}

// Expire range leases one more time, so that any remaining resolutions can
// get a range lease.
// TODO(bdarnell): understand why some tests need this.
mtc.expireLeases()
}

// TestChangeReplicasDuplicateError tests that a replica change aborts if
Expand Down
17 changes: 11 additions & 6 deletions storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,25 @@ func (rq *replicateQueue) process(
}

if log.V(1) {
log.Infof("range %d: adding replica to %+v due to under-replication", repl.RangeID, newReplica)
log.Infof("%s: adding replica to %+v due to under-replication", repl, newReplica)
}
log.Trace(ctx, fmt.Sprintf("adding replica to %+v due to under-replication", newReplica))
if err = repl.ChangeReplicas(ctx, roachpb.ADD_REPLICA, newReplica, desc); err != nil {
return err
}
case AllocatorRemove:
log.Trace(ctx, "removing a replica")
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas)
// Get the current lease holder so we don't try to remove it.
lease, _ := repl.getLease()
if lease == nil {
return errors.Errorf("%s: could not get current lease", repl)
}
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, lease.Replica.StoreID)
if err != nil {
return err
}
if log.V(1) {
log.Infof("range %d: removing replica %+v due to over-replication", repl.RangeID, removeReplica)
log.Infof("%s: removing replica %+v due to over-replication", repl, removeReplica)
}
log.Trace(ctx, fmt.Sprintf("removing replica %+v due to over-replication", removeReplica))
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
Expand All @@ -179,7 +184,7 @@ func (rq *replicateQueue) process(
}
deadReplica := deadReplicas[0]
if log.V(1) {
log.Infof("range %d: removing replica %+v from dead store", repl.RangeID, deadReplica)
log.Infof("%s: removing replica %+v from dead store", repl, deadReplica)
}
log.Trace(ctx, fmt.Sprintf("removing replica %+v from dead store", deadReplica))
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, deadReplica, desc); err != nil {
Expand All @@ -192,7 +197,7 @@ func (rq *replicateQueue) process(
rebalanceStore := rq.allocator.RebalanceTarget(repl.store.StoreID(), zone.ReplicaAttrs[0], desc.Replicas)
if rebalanceStore == nil {
if log.V(1) {
log.Infof("range %d: no suitable rebalance target", repl.RangeID)
log.Infof("%s: no suitable rebalance target", repl)
}
log.Trace(ctx, "no suitable rebalance target")
// No action was necessary and no rebalance target was found. Return
Expand All @@ -204,7 +209,7 @@ func (rq *replicateQueue) process(
StoreID: rebalanceStore.StoreID,
}
if log.V(1) {
log.Infof("range %d: rebalancing to %+v", repl.RangeID, rebalanceReplica)
log.Infof("%s: rebalancing to %+v", repl, rebalanceReplica)
}
log.Trace(ctx, fmt.Sprintf("rebalancing to %+v", rebalanceReplica))
if err = repl.ChangeReplicas(ctx, roachpb.ADD_REPLICA, rebalanceReplica, desc); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion storage/simulation/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func (r *Range) getAllocateTarget() (roachpb.StoreID, error) {
// getRemoveTarget queries the allocator for the store that contains a replica
// that can be removed.
func (r *Range) getRemoveTarget() (roachpb.StoreID, error) {
removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas)
// Pass in an invalid store ID since we don't consider range leases as part
// of the simulator.
removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, roachpb.StoreID(-1))
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 44cf1d8

Please sign in to comment.