Skip to content

Commit

Permalink
Merge pull request #2241 from mrtracy/mtracy-replica-change-safety
Browse files Browse the repository at this point in the history
Work towards #2152, Increasing transactional safety of replica changes
  • Loading branch information
mrtracy committed Aug 26, 2015
2 parents 205e626 + 9ffbd1a commit a294931
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 49 deletions.
5 changes: 3 additions & 2 deletions storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ func TestStoreRangeMergeNonConsecutive(t *testing.T) {
t.Fatal(err)
}

argsMerge := adminMergeArgs(rangeA.Desc().StartKey, 1, store.StoreID())
if _, err := rangeA.AdminMerge(argsMerge); !testutils.IsError(err, "ranges not collocated") {
desc := rangeA.Desc()
argsMerge := adminMergeArgs(desc.StartKey, 1, store.StoreID())
if _, err := rangeA.AdminMerge(argsMerge, desc); !testutils.IsError(err, "ranges not collocated") {
t.Fatalf("did not got expected error; got %s", err)
}

Expand Down
79 changes: 70 additions & 9 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestReplicateRange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, rng.Desc()); err != nil {
t.Fatal(err)
}
// Verify no intent remains on range descriptor key.
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRestoreReplicas(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, firstRng.Desc()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -372,7 +372,7 @@ func TestFailedReplicaChange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
}, rng.Desc())
if err == nil || !strings.Contains(err.Error(), "boom") {
t.Fatalf("did not get expected error: %s", err)
}
Expand All @@ -391,7 +391,7 @@ func TestFailedReplicaChange(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
})
}, rng.Desc())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestReplicateAfterTruncation(t *testing.T) {
proto.Replica{
NodeID: mtc.stores[1].Ident.NodeID,
StoreID: mtc.stores[1].Ident.StoreID,
}); err != nil {
}, rng.Desc()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -541,6 +541,65 @@ func TestStoreRangeReplicate(t *testing.T) {
}
}

// TestChangeReplicasDuplicateError tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)
mtc := startMultiTestContext(t, 3)
defer mtc.Stop()

repl, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}

addReplica := func(storeNum int, desc *proto.RangeDescriptor) error {
return repl.ChangeReplicas(proto.ADD_REPLICA,
proto.Replica{
NodeID: mtc.stores[storeNum].Ident.NodeID,
StoreID: mtc.stores[storeNum].Ident.StoreID,
},
desc)
}

// Retain the descriptor for the range at this point.
origDesc := repl.Desc()

// Add replica to the second store, which should succeed.
if err := addReplica(1, origDesc); err != nil {
t.Fatal(err)
}
if err := util.IsTrueWithin(func() bool {
r := mtc.stores[1].LookupReplica(proto.Key("a"), proto.Key("b"))
if r == nil {
return false
}
return true
}, time.Second); err != nil {
t.Fatal(err)
}

// Attempt to add replica to the third store with the original descriptor.
// This should fail because the descriptor is stale.
if err := addReplica(2, origDesc); err == nil {
t.Fatal("Expected error calling ChangeReplicas with stale RangeDescriptor")
}

// Add to third store with fresh descriptor.
if err := addReplica(2, repl.Desc()); err != nil {
t.Fatal(err)
}
if err := util.IsTrueWithin(func() bool {
r := mtc.stores[2].LookupReplica(proto.Key("a"), proto.Key("b"))
if r == nil {
return false
}
return true
}, time.Second); err != nil {
t.Fatal(err)
}
}

// TestProgressWithDownNode verifies that a surviving quorum can make progress
// with a downed node.
func TestProgressWithDownNode(t *testing.T) {
Expand Down Expand Up @@ -801,9 +860,10 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) {
if rng == nil {
t.Fatal("failed to look up min range")
}
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), rng.Desc().RangeID,
desc := rng.Desc()
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), desc.RangeID,
mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args); err != nil {
if _, err := rng.AdminSplit(args, desc); err != nil {
t.Fatal(err)
}
}
Expand All @@ -814,8 +874,9 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) {
if rng == nil {
t.Fatal("failed to look up max range")
}
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), rng.Desc().RangeID, mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args); err != nil {
desc := rng.Desc()
args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), desc.RangeID, mtc.stores[0].StoreID())
if _, err := rng.AdminSplit(args, desc); err != nil {
t.Fatal(err)
}
}
Expand Down
23 changes: 16 additions & 7 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -143,20 +142,17 @@ func TestStoreRangeSplitConcurrent(t *testing.T) {
store, stopper := createTestStore(t)
defer stopper.Stop()

splitKey := proto.Key("a")
concurrentCount := int32(10)
wg := sync.WaitGroup{}
wg.Add(int(concurrentCount))
failureCount := int32(0)
for i := int32(0); i < concurrentCount; i++ {
go func() {
args := adminSplitArgs(proto.KeyMin, []byte("a"), 1, store.StoreID())
args := adminSplitArgs(proto.KeyMin, splitKey, 1, store.StoreID())
_, err := store.ExecuteCmd(context.Background(), &args)
if err != nil {
if strings.Contains(err.Error(), "range is already split at key") {
atomic.AddInt32(&failureCount, 1)
} else {
t.Errorf("unexpected error: %s", err)
}
atomic.AddInt32(&failureCount, 1)
}
wg.Done()
}()
Expand All @@ -165,6 +161,19 @@ func TestStoreRangeSplitConcurrent(t *testing.T) {
if failureCount != concurrentCount-1 {
t.Fatalf("concurrent splits succeeded unexpectedly; failureCount=%d", failureCount)
}

// Verify everything ended up as expected.
if a, e := store.ReplicaCount(), 2; a != e {
t.Fatalf("expected %d stores after concurrent splits; actual count=%d", e, a)
}
rng := store.LookupReplica(proto.KeyMin, nil)
newRng := store.LookupReplica(splitKey, nil)
if !bytes.Equal(newRng.Desc().StartKey, splitKey) || !bytes.Equal(splitKey, rng.Desc().EndKey) {
t.Errorf("ranges mismatched, wanted %q=%q=%q", newRng.Desc().StartKey, splitKey, rng.Desc().EndKey)
}
if !bytes.Equal(newRng.Desc().EndKey, proto.KeyMax) || !bytes.Equal(rng.Desc().StartKey, proto.KeyMin) {
t.Errorf("new ranges do not cover KeyMin-KeyMax, but only %q-%q", rng.Desc().StartKey, newRng.Desc().EndKey)
}
}

// TestStoreRangeSplit executes a split of a range and verifies that the
Expand Down
4 changes: 2 additions & 2 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (m *multiTestContext) replicateRange(rangeID proto.RangeID, sourceStoreInde
proto.Replica{
NodeID: m.stores[dest].Ident.NodeID,
StoreID: m.stores[dest].Ident.StoreID,
})
}, rng.Desc())
if err != nil {
m.t.Fatal(err)
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (m *multiTestContext) unreplicateRange(rangeID proto.RangeID, source, dest
proto.Replica{
NodeID: m.idents[dest].NodeID,
StoreID: m.idents[dest].StoreID,
})
}, rng.Desc())
if err != nil {
m.t.Fatal(err)
}
Expand Down
6 changes: 2 additions & 4 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ type Replica struct {
rm rangeManager // Makes some store methods available
stats *rangeStats // Range statistics
maxBytes int64 // Max bytes before split.
// Held while a split, merge, or replica change is underway.
metaLock sync.Mutex // TODO(bdarnell): Revisit the metaLock.
// Last index persisted to the raft log (not necessarily committed).
// Updated atomically.
lastIndex uint64
Expand Down Expand Up @@ -592,10 +590,10 @@ func (r *Replica) addAdminCmd(ctx context.Context, args proto.Request) (proto.Re

switch tArgs := args.(type) {
case *proto.AdminSplitRequest:
resp, err := r.AdminSplit(*tArgs)
resp, err := r.AdminSplit(*tArgs, r.Desc())
return &resp, err
case *proto.AdminMergeRequest:
resp, err := r.AdminMerge(*tArgs)
resp, err := r.AdminMerge(*tArgs, r.Desc())
return &resp, err
default:
return nil, util.Error("unrecognized admin command")
Expand Down
34 changes: 16 additions & 18 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,17 +1031,19 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr
// updates the range addressing metadata. The handover of responsibility for
// the reassigned key range is carried out seamlessly through a split trigger
// carried out as part of the commit of that transaction.
func (r *Replica) AdminSplit(args proto.AdminSplitRequest) (proto.AdminSplitResponse, error) {
//
// The supplied RangeDescriptor is used as a form of optimistic lock. An
// operation which might split a range should obtain a copy of the range's
// current descriptor before making the decision to split. If the decision is
// affirmative the descriptor is passed to AdminSplit, which performs a
// Conditional Put on the RangeDescriptor to ensure that no other operation has
// modified the range in the time the decision was being made.
func (r *Replica) AdminSplit(args proto.AdminSplitRequest, desc *proto.RangeDescriptor) (proto.AdminSplitResponse, error) {
var reply proto.AdminSplitResponse

// Only allow a single split per range at a time.
r.metaLock.Lock()
defer r.metaLock.Unlock()

// Determine split key if not provided with args. This scan is
// allowed to be relatively slow because admin commands don't block
// other commands.
desc := r.Desc()
splitKey := proto.Key(args.SplitKey)
if len(splitKey) == 0 {
snap := r.rm.NewSnapshot()
Expand Down Expand Up @@ -1219,15 +1221,13 @@ func (r *Replica) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) e
// the reassigned key range is carried out seamlessly through a merge trigger
// carried out as part of the commit of that transaction.
// A merge requires that the two ranges are collocate on the same set of replicas.
func (r *Replica) AdminMerge(args proto.AdminMergeRequest) (proto.AdminMergeResponse, error) {
//
// The supplied RangeDescriptor is used as a form of optimistic lock. See the
// comment of "AdminSplit" for more information on this pattern.
func (r *Replica) AdminMerge(args proto.AdminMergeRequest, desc *proto.RangeDescriptor) (proto.AdminMergeResponse, error) {
var reply proto.AdminMergeResponse

// Only allow a single split/merge per range at a time.
r.metaLock.Lock()
defer r.metaLock.Unlock()

// Lookup subsumed range.
desc := r.Desc()
if desc.EndKey.Equal(proto.KeyMax) {
// Noop.
return reply, nil
Expand Down Expand Up @@ -1373,13 +1373,11 @@ func (r *Replica) changeReplicasTrigger(change *proto.ChangeReplicasTrigger) err
// ChangeReplicas adds or removes a replica of a range. The change is performed
// in a distributed transaction and takes effect when that transaction is committed.
// When removing a replica, only the NodeID and StoreID fields of the Replica are used.
func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica) error {
// Only allow a single change per range at a time.
r.metaLock.Lock()
defer r.metaLock.Unlock()

//
// The supplied RangeDescriptor is used as a form of optimistic lock. See the
// comment of "AdminSplit" for more information on this pattern.
func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica, desc *proto.RangeDescriptor) error {
// Validate the request and prepare the new descriptor.
desc := r.Desc()
updatedDesc := *desc
updatedDesc.Replicas = append([]proto.Replica{}, desc.Replicas...)
found := -1 // tracks NodeID && StoreID
Expand Down
2 changes: 1 addition & 1 deletion storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) {
if err := tc.rng.ChangeReplicas(proto.ADD_REPLICA, proto.Replica{
NodeID: tc.store.Ident.NodeID,
StoreID: 9999,
}); err == nil || !strings.Contains(err.Error(),
}, tc.rng.Desc()); err == nil || !strings.Contains(err.Error(),
"already present") {
t.Fatalf("must not be able to add second replica to same node (err=%s)",
err)
Expand Down
14 changes: 8 additions & 6 deletions storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ func (rq *replicateQueue) shouldQueue(now proto.Timestamp, repl *Replica) (
return
}

return rq.needsReplication(zone, repl)
return rq.needsReplication(zone, repl, repl.Desc())
}

func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica) (bool, float64) {
func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica,
desc *proto.RangeDescriptor) (bool, float64) {
// TODO(bdarnell): handle non-empty ReplicaAttrs.
need := len(zone.ReplicaAttrs)
have := len(repl.Desc().Replicas)
have := len(desc.Replicas)
if need > have {
if log.V(1) {
log.Infof("%s needs %d nodes; has %d", repl, need, have)
Expand All @@ -99,14 +100,15 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error {
return err
}

if needs, _ := rq.needsReplication(zone, repl); !needs {
desc := repl.Desc()
if needs, _ := rq.needsReplication(zone, repl, desc); !needs {
// Something changed between shouldQueue and process.
return nil
}

// TODO(bdarnell): handle non-homogenous ReplicaAttrs.
// Allow constraints to be relaxed if necessary.
newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], repl.Desc().Replicas, true)
newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], desc.Replicas, true)
if err != nil {
return err
}
Expand All @@ -115,7 +117,7 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error {
NodeID: newReplica.Node.NodeID,
StoreID: newReplica.StoreID,
}
if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica); err != nil {
if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica, desc); err != nil {
return err
}

Expand Down

0 comments on commit a294931

Please sign in to comment.