Skip to content

Commit

Permalink
Supply RangeDescriptor to Split/Merge/Change Funcs
Browse files Browse the repository at this point in the history
For #2152

This commit modifies AdminSplit, AdminMerge and ChangeReplicas to accept a
RangeDescriptor object.

The RangeDescriptor is used as a form of optimistic locking. All of these
operations will ultimately modify the RangeDescriptor in some way; to ensure
that no other concurrent operations modify the RangeDescriptor, a copy of the
original RangeDescriptor is captured before the modified RangeDescriptor is
computed. The original RangeDescriptor is passed to a ConditionalPut as the
first operation in the transaction, causing it to fail if concurrent
modifications have already committed.

However, capturing the original RangeDescriptor inside of the Split/Merge/Change
methods is insufficient. In most cases, the decision to call Split/Merge/Change
is made by another method based on the RangeDescriptor, and the decision may
have been different if a concurrent change is applied first. Therefore, the
original RangeDescriptor needs to be captured before calling Split/Merge/Change.

This commit modifies all three methods to use the new pattern, with a required
RangeDescriptor being passed to the method.

Note that the behavior of Split and Merge is still not optimal, because the
call to AdminSplit or AdminMerge is always routed through Raft. This pattern is
unnecessary in cases where the split/merge decision is computed on the server.
  • Loading branch information
Matt Tracy committed Aug 26, 2015
1 parent 4458654 commit 9ffbd1a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 28 deletions.
5 changes: 3 additions & 2 deletions storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ func TestStoreRangeMergeNonConsecutive(t *testing.T) {
t.Fatal(err)
}

argsMerge := adminMergeArgs(rangeA.Desc().StartKey, 1, store.StoreID())
if _, err := rangeA.AdminMerge(argsMerge); !testutils.IsError(err, "ranges not collocated") {
desc := rangeA.Desc()
argsMerge := adminMergeArgs(desc.StartKey, 1, store.StoreID())
if _, err := rangeA.AdminMerge(argsMerge, desc); !testutils.IsError(err, "ranges not collocated") {
t.Fatalf("did not got expected error; got %s", err)
}

Expand Down
79 changes: 70 additions & 9 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestReplicateRange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, rng.Desc()); err != nil {
t.Fatal(err)
}
// Verify no intent remains on range descriptor key.
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRestoreReplicas(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, firstRng.Desc()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -372,7 +372,7 @@ func TestFailedReplicaChange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
}, rng.Desc())
if err == nil || !strings.Contains(err.Error(), "boom") {
t.Fatalf("did not get expected error: %s", err)
}
Expand All @@ -391,7 +391,7 @@ func TestFailedReplicaChange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
}, rng.Desc())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestReplicateAfterTruncation(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, rng.Desc()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -541,6 +541,65 @@ func TestStoreRangeReplicate(t *testing.T) {
}
}

// TestChangeReplicasDuplicateError tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)
mtc := startMultiTestContext(t, 3)
defer mtc.Stop()

repl, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}

addReplica := func(storeNum int, desc *proto.RangeDescriptor) error {
return repl.ChangeReplicas(proto.ADD_REPLICA,
proto.Replica{
NodeID: mtc.stores[storeNum].Ident.NodeID,
StoreID: mtc.stores[storeNum].Ident.StoreID,
},
desc)
}

// Retain the descriptor for the range at this point.
origDesc := repl.Desc()

// Add replica to the second store, which should succeed.
if err := addReplica(1, origDesc); err != nil {
t.Fatal(err)
}
if err := util.IsTrueWithin(func() bool {
r := mtc.stores[1].LookupReplica(proto.Key("a"), proto.Key("b"))
if r == nil {
return false
}
return true
}, time.Second); err != nil {
t.Fatal(err)
}

// Attempt to add replica to the third store with the original descriptor.
// This should fail because the descriptor is stale.
if err := addReplica(2, origDesc); err == nil {
t.Fatal("Expected error calling ChangeReplicas with stale RangeDescriptor")
}

// Add to third store with fresh descriptor.
if err := addReplica(2, repl.Desc()); err != nil {
t.Fatal(err)
}
if err := util.IsTrueWithin(func() bool {
r := mtc.stores[2].LookupReplica(proto.Key("a"), proto.Key("b"))
if r == nil {
return false
}
return true
}, time.Second); err != nil {
t.Fatal(err)
}
}

// TestProgressWithDownNode verifies that a surviving quorum can make progress
// with a downed node.
func TestProgressWithDownNode(t *testing.T) {
Expand Down Expand Up @@ -801,9 +860,10 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) {
if rng == nil {
t.Fatal("failed to look up min range")
}
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), rng.Desc().RangeID,
desc := rng.Desc()
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), desc.RangeID,
mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args); err != nil {
if _, err := rng.AdminSplit(args, desc); err != nil {
t.Fatal(err)
}
}
Expand All @@ -814,8 +874,9 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) {
if rng == nil {
t.Fatal("failed to look up max range")
}
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), rng.Desc().RangeID, mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args); err != nil {
desc := rng.Desc()
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), desc.RangeID, mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args, desc); err != nil {
t.Fatal(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (m *multiTestContext) replicateRange(rangeID proto.RangeID, sourceStoreInde
proto.Replica{
NodeID: m.stores[dest].Ident.NodeID,
StoreID: m.stores[dest].Ident.StoreID,
})
}, rng.Desc())
if err != nil {
m.t.Fatal(err)
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (m *multiTestContext) unreplicateRange(rangeID proto.RangeID, source, dest
proto.Replica{
NodeID: m.idents[dest].NodeID,
StoreID: m.idents[dest].StoreID,
})
}, rng.Desc())
if err != nil {
m.t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,10 @@ func (r *Replica) addAdminCmd(ctx context.Context, args proto.Request) (proto.Re

switch tArgs := args.(type) {
case *proto.AdminSplitRequest:
resp, err := r.AdminSplit(*tArgs)
resp, err := r.AdminSplit(*tArgs, r.Desc())
return &resp, err
case *proto.AdminMergeRequest:
resp, err := r.AdminMerge(*tArgs)
resp, err := r.AdminMerge(*tArgs, r.Desc())
return &resp, err
default:
return nil, util.Error("unrecognized admin command")
Expand Down
22 changes: 16 additions & 6 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,13 +1031,19 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr
// updates the range addressing metadata. The handover of responsibility for
// the reassigned key range is carried out seamlessly through a split trigger
// carried out as part of the commit of that transaction.
func (r *Replica) AdminSplit(args proto.AdminSplitRequest) (proto.AdminSplitResponse, error) {
//
// The supplied RangeDescriptor is used as a form of optimistic lock. An
// operation which might split a range should obtain a copy of the range's
// current descriptor before making the decision to split. If the decision is
// affirmative the descriptor is passed to AdminSplit, which performs a
// Conditional Put on the RangeDescriptor to ensure that no other operation has
// modified the range in the time the decision was being made.
func (r *Replica) AdminSplit(args proto.AdminSplitRequest, desc *proto.RangeDescriptor) (proto.AdminSplitResponse, error) {
var reply proto.AdminSplitResponse

// Determine split key if not provided with args. This scan is
// allowed to be relatively slow because admin commands don't block
// other commands.
desc := r.Desc()
splitKey := proto.Key(args.SplitKey)
if len(splitKey) == 0 {
snap := r.rm.NewSnapshot()
Expand Down Expand Up @@ -1215,11 +1221,13 @@ func (r *Replica) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) e
// the reassigned key range is carried out seamlessly through a merge trigger
// carried out as part of the commit of that transaction.
// A merge requires that the two ranges are collocate on the same set of replicas.
func (r *Replica) AdminMerge(args proto.AdminMergeRequest) (proto.AdminMergeResponse, error) {
//
// The supplied RangeDescriptor is used as a form of optimistic lock. See the
// comment of "AdminSplit" for more information on this pattern.
func (r *Replica) AdminMerge(args proto.AdminMergeRequest, desc *proto.RangeDescriptor) (proto.AdminMergeResponse, error) {
var reply proto.AdminMergeResponse

// Lookup subsumed range.
desc := r.Desc()
if desc.EndKey.Equal(proto.KeyMax) {
// Noop.
return reply, nil
Expand Down Expand Up @@ -1365,9 +1373,11 @@ func (r *Replica) changeReplicasTrigger(change *proto.ChangeReplicasTrigger) err
// ChangeReplicas adds or removes a replica of a range. The change is performed
// in a distributed transaction and takes effect when that transaction is committed.
// When removing a replica, only the NodeID and StoreID fields of the Replica are used.
func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica) error {
//
// The supplied RangeDescriptor is used as a form of optimistic lock. See the
// comment of "AdminSplit" for more information on this pattern.
func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica, desc *proto.RangeDescriptor) error {
// Validate the request and prepare the new descriptor.
desc := r.Desc()
updatedDesc := *desc
updatedDesc.Replicas = append([]proto.Replica{}, desc.Replicas...)
found := -1 // tracks NodeID && StoreID
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) {
if err := tc.rng.ChangeReplicas(proto.ADD_REPLICA, proto.Replica{
NodeID: tc.store.Ident.NodeID,
StoreID: 9999,
}); err == nil || !strings.Contains(err.Error(),
}, tc.rng.Desc()); err == nil || !strings.Contains(err.Error(),
"already present") {
t.Fatalf("must not be able to add second replica to same node (err=%s)",
err)
Expand Down
14 changes: 8 additions & 6 deletions storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func (rq *replicateQueue) shouldQueue(now proto.Timestamp, repl *Replica) (
return
}

return rq.needsReplication(zone, repl)
return rq.needsReplication(zone, repl, repl.Desc())
}

func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica) (bool, float64) {
func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica,
desc *proto.RangeDescriptor) (bool, float64) {
// TODO(bdarnell): handle non-empty ReplicaAttrs.
need := len(zone.ReplicaAttrs)
have := len(repl.Desc().Replicas)
have := len(desc.Replicas)
if need > have {
if log.V(1) {
log.Infof("%s needs %d nodes; has %d", repl, need, have)
Expand All @@ -99,14 +100,15 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error {
return err
}

if needs, _ := rq.needsReplication(zone, repl); !needs {
desc := repl.Desc()
if needs, _ := rq.needsReplication(zone, repl, desc); !needs {
// Something changed between shouldQueue and process.
return nil
}

// TODO(bdarnell): handle non-homogenous ReplicaAttrs.
// Allow constraints to be relaxed if necessary.
newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], repl.Desc().Replicas, true)
newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], desc.Replicas, true)
if err != nil {
return err
}
Expand All @@ -115,7 +117,7 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error {
NodeID: newReplica.Node.NodeID,
StoreID: newReplica.StoreID,
}
if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica); err != nil {
if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica, desc); err != nil {
return err
}

Expand Down

0 comments on commit 9ffbd1a

Please sign in to comment.