From 44586540139cce38ab1b74e4f7c34d4449f191c0 Mon Sep 17 00:00:00 2001 From: Matt Tracy Date: Thu, 20 Aug 2015 16:29:00 -0400 Subject: [PATCH 1/2] Remove Replica.metaLock mutex For issue #2152 This mutex was providing no real concurrent safety, because it gave no consideration to actions by other stores. Concurrent safety is already provided by the transactions in each operation, so it can be removed. --- storage/client_split_test.go | 23 ++++++++++++++++------- storage/replica.go | 2 -- storage/replica_command.go | 12 ------------ 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/storage/client_split_test.go b/storage/client_split_test.go index 940dff8479ed..af73042f579a 100644 --- a/storage/client_split_test.go +++ b/storage/client_split_test.go @@ -22,7 +22,6 @@ import ( "fmt" "math/rand" "reflect" - "strings" "sync" "sync/atomic" "testing" @@ -143,20 +142,17 @@ func TestStoreRangeSplitConcurrent(t *testing.T) { store, stopper := createTestStore(t) defer stopper.Stop() + splitKey := proto.Key("a") concurrentCount := int32(10) wg := sync.WaitGroup{} wg.Add(int(concurrentCount)) failureCount := int32(0) for i := int32(0); i < concurrentCount; i++ { go func() { - args := adminSplitArgs(proto.KeyMin, []byte("a"), 1, store.StoreID()) + args := adminSplitArgs(proto.KeyMin, splitKey, 1, store.StoreID()) _, err := store.ExecuteCmd(context.Background(), &args) if err != nil { - if strings.Contains(err.Error(), "range is already split at key") { - atomic.AddInt32(&failureCount, 1) - } else { - t.Errorf("unexpected error: %s", err) - } + atomic.AddInt32(&failureCount, 1) } wg.Done() }() @@ -165,6 +161,19 @@ func TestStoreRangeSplitConcurrent(t *testing.T) { if failureCount != concurrentCount-1 { t.Fatalf("concurrent splits succeeded unexpectedly; failureCount=%d", failureCount) } + + // Verify everything ended up as expected. + if a, e := store.ReplicaCount(), 2; a != e { + t.Fatalf("expected %d stores after concurrent splits; actual count=%d", e, a) + } + rng := store.LookupReplica(proto.KeyMin, nil) + newRng := store.LookupReplica(splitKey, nil) + if !bytes.Equal(newRng.Desc().StartKey, splitKey) || !bytes.Equal(splitKey, rng.Desc().EndKey) { + t.Errorf("ranges mismatched, wanted %q=%q=%q", newRng.Desc().StartKey, splitKey, rng.Desc().EndKey) + } + if !bytes.Equal(newRng.Desc().EndKey, proto.KeyMax) || !bytes.Equal(rng.Desc().StartKey, proto.KeyMin) { + t.Errorf("new ranges do not cover KeyMin-KeyMax, but only %q-%q", rng.Desc().StartKey, newRng.Desc().EndKey) + } } // TestStoreRangeSplit executes a split of a range and verifies that the diff --git a/storage/replica.go b/storage/replica.go index 25efe1826ee5..f109c707884c 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -190,8 +190,6 @@ type Replica struct { rm rangeManager // Makes some store methods available stats *rangeStats // Range statistics maxBytes int64 // Max bytes before split. - // Held while a split, merge, or replica change is underway. - metaLock sync.Mutex // TODO(bdarnell): Revisit the metaLock. // Last index persisted to the raft log (not necessarily committed). // Updated atomically. lastIndex uint64 diff --git a/storage/replica_command.go b/storage/replica_command.go index a34a3b88d7b0..167e3de57237 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -1034,10 +1034,6 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr func (r *Replica) AdminSplit(args proto.AdminSplitRequest) (proto.AdminSplitResponse, error) { var reply proto.AdminSplitResponse - // Only allow a single split per range at a time. - r.metaLock.Lock() - defer r.metaLock.Unlock() - // Determine split key if not provided with args. This scan is // allowed to be relatively slow because admin commands don't block // other commands. @@ -1222,10 +1218,6 @@ func (r *Replica) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) e func (r *Replica) AdminMerge(args proto.AdminMergeRequest) (proto.AdminMergeResponse, error) { var reply proto.AdminMergeResponse - // Only allow a single split/merge per range at a time. - r.metaLock.Lock() - defer r.metaLock.Unlock() - // Lookup subsumed range. desc := r.Desc() if desc.EndKey.Equal(proto.KeyMax) { @@ -1374,10 +1366,6 @@ func (r *Replica) changeReplicasTrigger(change *proto.ChangeReplicasTrigger) err // in a distributed transaction and takes effect when that transaction is committed. // When removing a replica, only the NodeID and StoreID fields of the Replica are used. func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica) error { - // Only allow a single change per range at a time. - r.metaLock.Lock() - defer r.metaLock.Unlock() - // Validate the request and prepare the new descriptor. desc := r.Desc() updatedDesc := *desc From 9ffbd1a6413f444d42fbf6bca482d3eb01827b64 Mon Sep 17 00:00:00 2001 From: Matt Tracy Date: Mon, 24 Aug 2015 17:15:09 -0400 Subject: [PATCH 2/2] Supply RangeDescriptor to Split/Merge/Change Funcs For #2152 This commit modifies AdminSplit, AdminMerge and ChangeReplicas to accept a RangeDescriptor object. The RangeDescriptor is used as a form of optimistic locking. All of these operations will ultimately modify the RangeDescriptor in some way; to ensure that no other concurrent operations modify the RangeDescriptor, a copy of the original RangeDescriptor is captured before the modified RangeDescriptor is computed. The original RangeDescriptor is passed to a ConditionalPut as the first operation in the transaction, causing it to fail if concurrent modifications have already committed. However, capturing the original RangeDescriptor inside of the Split/Merge/Change methods is insufficient. In most cases, the decision to call Split/Merge/Change is made by another method based on the RangeDescriptor, and the decision may have been different if a concurrent change is applied first. Therefore, the original RangeDescriptor needs to be captured before calling Split/Merge/Change. This commit modifies all three methods to use the new pattern, with a required RangeDescriptor being passed to the method. Note that the behavior of Split and Merge is still not optimal, because the call to AdminSplit or AdminMerge is always routed through Raft. This pattern is unnecessary in cases where the split/merge decision is computed on the server. --- storage/client_merge_test.go | 5 ++- storage/client_raft_test.go | 79 ++++++++++++++++++++++++++++++++---- storage/client_test.go | 4 +- storage/replica.go | 4 +- storage/replica_command.go | 22 +++++++--- storage/replica_test.go | 2 +- storage/replicate_queue.go | 14 ++++--- 7 files changed, 102 insertions(+), 28 deletions(-) diff --git a/storage/client_merge_test.go b/storage/client_merge_test.go index bee9f94a713c..abdab1c633cc 100644 --- a/storage/client_merge_test.go +++ b/storage/client_merge_test.go @@ -258,8 +258,9 @@ func TestStoreRangeMergeNonConsecutive(t *testing.T) { t.Fatal(err) } - argsMerge := adminMergeArgs(rangeA.Desc().StartKey, 1, store.StoreID()) - if _, err := rangeA.AdminMerge(argsMerge); !testutils.IsError(err, "ranges not collocated") { + desc := rangeA.Desc() + argsMerge := adminMergeArgs(desc.StartKey, 1, store.StoreID()) + if _, err := rangeA.AdminMerge(argsMerge, desc); !testutils.IsError(err, "ranges not collocated") { t.Fatalf("did not got expected error; got %s", err) } diff --git a/storage/client_raft_test.go b/storage/client_raft_test.go index ec8be8d5b2d5..7a06f99c58aa 100644 --- a/storage/client_raft_test.go +++ b/storage/client_raft_test.go @@ -217,7 +217,7 @@ func TestReplicateRange(t *testing.T) { proto.Replica{ NodeID: mtc.stores[1].Ident.NodeID, StoreID: mtc.stores[1].Ident.StoreID, - }); err != nil { + }, rng.Desc()); err != nil { t.Fatal(err) } // Verify no intent remains on range descriptor key. @@ -279,7 +279,7 @@ func TestRestoreReplicas(t *testing.T) { proto.Replica{ NodeID: mtc.stores[1].Ident.NodeID, StoreID: mtc.stores[1].Ident.StoreID, - }); err != nil { + }, firstRng.Desc()); err != nil { t.Fatal(err) } @@ -372,7 +372,7 @@ func TestFailedReplicaChange(t *testing.T) { proto.Replica{ NodeID: mtc.stores[1].Ident.NodeID, StoreID: mtc.stores[1].Ident.StoreID, - }) + }, rng.Desc()) if err == nil || !strings.Contains(err.Error(), "boom") { t.Fatalf("did not get expected error: %s", err) } @@ -391,7 +391,7 @@ func TestFailedReplicaChange(t *testing.T) { proto.Replica{ NodeID: mtc.stores[1].Ident.NodeID, StoreID: mtc.stores[1].Ident.StoreID, - }) + }, rng.Desc()) if err != nil { t.Fatal(err) } @@ -456,7 +456,7 @@ func TestReplicateAfterTruncation(t *testing.T) { proto.Replica{ NodeID: mtc.stores[1].Ident.NodeID, StoreID: mtc.stores[1].Ident.StoreID, - }); err != nil { + }, rng.Desc()); err != nil { t.Fatal(err) } @@ -541,6 +541,65 @@ func TestStoreRangeReplicate(t *testing.T) { } } +// TestChangeReplicasDuplicateError tests that a replica change aborts if +// another change has been made to the RangeDescriptor since it was initiated. +func TestChangeReplicasDescriptorInvariant(t *testing.T) { + defer leaktest.AfterTest(t) + mtc := startMultiTestContext(t, 3) + defer mtc.Stop() + + repl, err := mtc.stores[0].GetReplica(1) + if err != nil { + t.Fatal(err) + } + + addReplica := func(storeNum int, desc *proto.RangeDescriptor) error { + return repl.ChangeReplicas(proto.ADD_REPLICA, + proto.Replica{ + NodeID: mtc.stores[storeNum].Ident.NodeID, + StoreID: mtc.stores[storeNum].Ident.StoreID, + }, + desc) + } + + // Retain the descriptor for the range at this point. + origDesc := repl.Desc() + + // Add replica to the second store, which should succeed. + if err := addReplica(1, origDesc); err != nil { + t.Fatal(err) + } + if err := util.IsTrueWithin(func() bool { + r := mtc.stores[1].LookupReplica(proto.Key("a"), proto.Key("b")) + if r == nil { + return false + } + return true + }, time.Second); err != nil { + t.Fatal(err) + } + + // Attempt to add replica to the third store with the original descriptor. + // This should fail because the descriptor is stale. + if err := addReplica(2, origDesc); err == nil { + t.Fatal("Expected error calling ChangeReplicas with stale RangeDescriptor") + } + + // Add to third store with fresh descriptor. + if err := addReplica(2, repl.Desc()); err != nil { + t.Fatal(err) + } + if err := util.IsTrueWithin(func() bool { + r := mtc.stores[2].LookupReplica(proto.Key("a"), proto.Key("b")) + if r == nil { + return false + } + return true + }, time.Second); err != nil { + t.Fatal(err) + } +} + // TestProgressWithDownNode verifies that a surviving quorum can make progress // with a downed node. func TestProgressWithDownNode(t *testing.T) { @@ -801,9 +860,10 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) { if rng == nil { t.Fatal("failed to look up min range") } - args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), rng.Desc().RangeID, + desc := rng.Desc() + args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("A%03d", i)), desc.RangeID, mtc.stores[0].StoreID()) - if _, err := rng.AdminSplit(args); err != nil { + if _, err := rng.AdminSplit(args, desc); err != nil { t.Fatal(err) } } @@ -814,8 +874,9 @@ func TestRangeDescriptorSnapshotRace(t *testing.T) { if rng == nil { t.Fatal("failed to look up max range") } - args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), rng.Desc().RangeID, mtc.stores[0].StoreID()) - if _, err := rng.AdminSplit(args); err != nil { + desc := rng.Desc() + args := adminSplitArgs(proto.KeyMin, []byte(fmt.Sprintf("B%03d", i)), desc.RangeID, mtc.stores[0].StoreID()) + if _, err := rng.AdminSplit(args, desc); err != nil { t.Fatal(err) } } diff --git a/storage/client_test.go b/storage/client_test.go index 5651a2b50c8e..b50357098dbd 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -301,7 +301,7 @@ func (m *multiTestContext) replicateRange(rangeID proto.RangeID, sourceStoreInde proto.Replica{ NodeID: m.stores[dest].Ident.NodeID, StoreID: m.stores[dest].Ident.StoreID, - }) + }, rng.Desc()) if err != nil { m.t.Fatal(err) } @@ -332,7 +332,7 @@ func (m *multiTestContext) unreplicateRange(rangeID proto.RangeID, source, dest proto.Replica{ NodeID: m.idents[dest].NodeID, StoreID: m.idents[dest].StoreID, - }) + }, rng.Desc()) if err != nil { m.t.Fatal(err) } diff --git a/storage/replica.go b/storage/replica.go index f109c707884c..b946ca7e0c82 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -590,10 +590,10 @@ func (r *Replica) addAdminCmd(ctx context.Context, args proto.Request) (proto.Re switch tArgs := args.(type) { case *proto.AdminSplitRequest: - resp, err := r.AdminSplit(*tArgs) + resp, err := r.AdminSplit(*tArgs, r.Desc()) return &resp, err case *proto.AdminMergeRequest: - resp, err := r.AdminMerge(*tArgs) + resp, err := r.AdminMerge(*tArgs, r.Desc()) return &resp, err default: return nil, util.Error("unrecognized admin command") diff --git a/storage/replica_command.go b/storage/replica_command.go index 167e3de57237..a16243344a47 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -1031,13 +1031,19 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr // updates the range addressing metadata. The handover of responsibility for // the reassigned key range is carried out seamlessly through a split trigger // carried out as part of the commit of that transaction. -func (r *Replica) AdminSplit(args proto.AdminSplitRequest) (proto.AdminSplitResponse, error) { +// +// The supplied RangeDescriptor is used as a form of optimistic lock. An +// operation which might split a range should obtain a copy of the range's +// current descriptor before making the decision to split. If the decision is +// affirmative the descriptor is passed to AdminSplit, which performs a +// Conditional Put on the RangeDescriptor to ensure that no other operation has +// modified the range in the time the decision was being made. +func (r *Replica) AdminSplit(args proto.AdminSplitRequest, desc *proto.RangeDescriptor) (proto.AdminSplitResponse, error) { var reply proto.AdminSplitResponse // Determine split key if not provided with args. This scan is // allowed to be relatively slow because admin commands don't block // other commands. - desc := r.Desc() splitKey := proto.Key(args.SplitKey) if len(splitKey) == 0 { snap := r.rm.NewSnapshot() @@ -1215,11 +1221,13 @@ func (r *Replica) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) e // the reassigned key range is carried out seamlessly through a merge trigger // carried out as part of the commit of that transaction. // A merge requires that the two ranges are collocate on the same set of replicas. -func (r *Replica) AdminMerge(args proto.AdminMergeRequest) (proto.AdminMergeResponse, error) { +// +// The supplied RangeDescriptor is used as a form of optimistic lock. See the +// comment of "AdminSplit" for more information on this pattern. +func (r *Replica) AdminMerge(args proto.AdminMergeRequest, desc *proto.RangeDescriptor) (proto.AdminMergeResponse, error) { var reply proto.AdminMergeResponse // Lookup subsumed range. - desc := r.Desc() if desc.EndKey.Equal(proto.KeyMax) { // Noop. return reply, nil @@ -1365,9 +1373,11 @@ func (r *Replica) changeReplicasTrigger(change *proto.ChangeReplicasTrigger) err // ChangeReplicas adds or removes a replica of a range. The change is performed // in a distributed transaction and takes effect when that transaction is committed. // When removing a replica, only the NodeID and StoreID fields of the Replica are used. -func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica) error { +// +// The supplied RangeDescriptor is used as a form of optimistic lock. See the +// comment of "AdminSplit" for more information on this pattern. +func (r *Replica) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica, desc *proto.RangeDescriptor) error { // Validate the request and prepare the new descriptor. - desc := r.Desc() updatedDesc := *desc updatedDesc.Replicas = append([]proto.Replica{}, desc.Replicas...) found := -1 // tracks NodeID && StoreID diff --git a/storage/replica_test.go b/storage/replica_test.go index 916217e4bfd0..2307a095031d 100644 --- a/storage/replica_test.go +++ b/storage/replica_test.go @@ -2617,7 +2617,7 @@ func TestChangeReplicasDuplicateError(t *testing.T) { if err := tc.rng.ChangeReplicas(proto.ADD_REPLICA, proto.Replica{ NodeID: tc.store.Ident.NodeID, StoreID: 9999, - }); err == nil || !strings.Contains(err.Error(), + }, tc.rng.Desc()); err == nil || !strings.Contains(err.Error(), "already present") { t.Fatalf("must not be able to add second replica to same node (err=%s)", err) diff --git a/storage/replicate_queue.go b/storage/replicate_queue.go index ff017b62130d..2609f0b41421 100644 --- a/storage/replicate_queue.go +++ b/storage/replicate_queue.go @@ -76,13 +76,14 @@ func (rq *replicateQueue) shouldQueue(now proto.Timestamp, repl *Replica) ( return } - return rq.needsReplication(zone, repl) + return rq.needsReplication(zone, repl, repl.Desc()) } -func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica) (bool, float64) { +func (rq *replicateQueue) needsReplication(zone config.ZoneConfig, repl *Replica, + desc *proto.RangeDescriptor) (bool, float64) { // TODO(bdarnell): handle non-empty ReplicaAttrs. need := len(zone.ReplicaAttrs) - have := len(repl.Desc().Replicas) + have := len(desc.Replicas) if need > have { if log.V(1) { log.Infof("%s needs %d nodes; has %d", repl, need, have) @@ -99,14 +100,15 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error { return err } - if needs, _ := rq.needsReplication(zone, repl); !needs { + desc := repl.Desc() + if needs, _ := rq.needsReplication(zone, repl, desc); !needs { // Something changed between shouldQueue and process. return nil } // TODO(bdarnell): handle non-homogenous ReplicaAttrs. // Allow constraints to be relaxed if necessary. - newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], repl.Desc().Replicas, true) + newReplica, err := rq.allocator.AllocateTarget(zone.ReplicaAttrs[0], desc.Replicas, true) if err != nil { return err } @@ -115,7 +117,7 @@ func (rq *replicateQueue) process(now proto.Timestamp, repl *Replica) error { NodeID: newReplica.Node.NodeID, StoreID: newReplica.StoreID, } - if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica); err != nil { + if err = repl.ChangeReplicas(proto.ADD_REPLICA, replica, desc); err != nil { return err }