Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use atomic replication changes in RelocateRange #41084

Merged
merged 1 commit into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,35 @@ func (db *DB) AdminTransferLease(
return getOneErr(db.Run(ctx, b), b)
}

// ChangeReplicasCanMixAddAndRemoveContext convinces
// (*client.DB).AdminChangeReplicas that the caller is aware that 19.1 nodes
// don't know how to handle requests that mix additions and removals; 19.2+
// binaries understand this due to the work done in the context of atomic
// replication changes. If 19.1 nodes received such a request they'd mistake the
// removals for additions.
//
// In effect users of the RPC need to check the cluster version which in the
// past has been a brittle pattern, so this time the DB disallows the new
// behavior unless it can determine (via the ctx) that the caller went through
// this method and is thus aware of the intricacies.
//
// See https://github.com/cockroachdb/cockroach/pull/39611.
//
// TODO(tbg): remove in 20.1.
func ChangeReplicasCanMixAddAndRemoveContext(ctx context.Context) context.Context {
return context.WithValue(ctx, adminChangeReplicasMixHint{}, adminChangeReplicasMixHint{})
}

type adminChangeReplicasMixHint struct{}

// AdminChangeReplicas adds or removes a set of replicas for a range.
func (db *DB) AdminChangeReplicas(
ctx context.Context,
key interface{},
expDesc roachpb.RangeDescriptor,
chgs []roachpb.ReplicationChange,
) (*roachpb.RangeDescriptor, error) {
if ctx.Value("testing") == nil {
if ctx.Value(adminChangeReplicasMixHint{}) == nil {
// Disallow trying to add and remove replicas in the same set of
// changes. This will only work when the node receiving the request is
// running 19.2 (code, not cluster version).
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -60,9 +61,7 @@ func TestAtomicReplicationChange(t *testing.T) {
runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor {
t.Helper()
desc, err := tc.Servers[0].DB().AdminChangeReplicas(
// TODO(tbg): when 19.2 is out, remove this "feature gate" here and in
// AdminChangeReplicas.
context.WithValue(ctx, "testing", "testing"),
client.ChangeReplicasCanMixAddAndRemoveContext(ctx),
k, expDesc, chgs,
)
require.NoError(t, err)
Expand Down
192 changes: 192 additions & 0 deletions pkg/storage/client_relocate_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage_test

import (
"context"
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func relocateAndCheck(
t *testing.T,
tc *testcluster.TestCluster,
startKey roachpb.RKey,
targets []roachpb.ReplicationTarget,
) {
require.NoError(t, tc.Servers[0].DB().AdminRelocateRange(context.Background(), startKey.AsRawKey(), targets))
desc, err := tc.Servers[0].LookupRange(startKey.AsRawKey())
require.NoError(t, err)
requireDescMembers(t, desc, targets)
requireLeaseAt(t, tc, desc, targets[0])
}

func requireDescMembers(
t *testing.T, desc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget,
) {
t.Helper()
targets = append([]roachpb.ReplicationTarget(nil), targets...)
sort.Slice(targets, func(i, j int) bool { return targets[i].StoreID < targets[j].StoreID })

have := make([]roachpb.ReplicationTarget, 0, len(targets))
for _, rDesc := range desc.Replicas().All() {
have = append(have, roachpb.ReplicationTarget{
NodeID: rDesc.NodeID,
StoreID: rDesc.StoreID,
})
}
sort.Slice(have, func(i, j int) bool { return have[i].StoreID < have[j].StoreID })
require.Equal(t, targets, have)
}

func requireLeaseAt(
t *testing.T,
tc *testcluster.TestCluster,
desc roachpb.RangeDescriptor,
target roachpb.ReplicationTarget,
) {
t.Helper()
// NB: under stressrace the lease will sometimes be inactive by the time
// it's returned here, so don't use FindRangeLeaseHolder which fails when
// that happens.
testutils.SucceedsSoon(t, func() error {
lease, _, err := tc.FindRangeLease(desc, &target)
if err != nil {
return err
}
if target != (roachpb.ReplicationTarget{
NodeID: lease.Replica.NodeID,
StoreID: lease.Replica.StoreID,
}) {
return errors.Errorf("lease %v is not held by %+v", lease, target)
}
return nil
})
}

func TestAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

type intercept struct {
ops []roachpb.ReplicationChange
leaseTarget *roachpb.ReplicationTarget
err error
}
var intercepted []intercept

requireNumAtomic := func(expAtomic int, expSingle int, f func()) {
t.Helper()
intercepted = nil
f()
var actAtomic, actSingle int
for _, ic := range intercepted {
if ic.err != nil {
continue
}
if len(ic.ops) == 2 && ic.ops[0].ChangeType == roachpb.ADD_REPLICA && ic.ops[1].ChangeType == roachpb.REMOVE_REPLICA {
actAtomic++
} else {
actSingle++
}
}
require.Equal(t, expAtomic, actAtomic, "wrong number of atomic changes: %+v", intercepted)
require.Equal(t, expSingle, actSingle, "wrong number of single changes: %+v", intercepted)
}

knobs := base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BeforeRelocateOne: func(ops []roachpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget, err error) {
intercepted = append(intercepted, intercept{
ops: ops,
leaseTarget: leaseTarget,
err: err,
})
},
},
}
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 6, args)
defer tc.Stopper().Stop(ctx)

// s1 (LH) ---> s2 (LH) s1 s3
// Pure upreplication.
k := keys.MustAddr(tc.ScratchRange(t))
{
targets := tc.Targets(1, 0, 2)
// Expect two single additions, and that's it.
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, targets)
})
}

// s1 (LH) s2 s3 ---> s4 (LH) s5 s6.
// This is trickier because the leaseholder gets removed, and so do all
// other replicas (i.e. a simple lease transfer at the beginning won't solve
// the problem).
{
targets := tc.Targets(3, 4, 5)
// Should carry out three swaps. Note that the leaseholder gets removed
// in the process (i.e. internally the lease must've been moved around
// to achieve that).
requireNumAtomic(3, 0, func() {
relocateAndCheck(t, tc, k, targets)
})
}

// s4 (LH) s5 s6 ---> s5 (LH)
// Pure downreplication.
{
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(4))
})
}

// s5 (LH) ---> s3 (LH)
// Lateral movement while at replication factor one. In this case atomic
// replication changes cannot be used; we add-then-remove instead.
{
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(2))
})
}

// s3 (LH) ---> s2 (LH) s4 s1 --> s4 (LH) s2 s6 s1 --> s3 (LH) s5
// A grab bag.
{
// s3 -(add)-> s3 s2 -(swap)-> s4 s2 -(add)-> s4 s2 s1 (=s2 s4 s1)
requireNumAtomic(1, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(1, 3, 0))
})
// s2 s4 s1 -(add)-> s2 s4 s1 s6 (=s4 s2 s6 s1)
requireNumAtomic(0, 1, func() {
relocateAndCheck(t, tc, k, tc.Targets(3, 1, 5, 0))
})
// s4 s2 s6 s1 -(swap)-> s3 s2 s6 s1 -(swap)-> s3 s5 s6 s1 -(del)-> s3 s5 s6 -(del)-> s3 s5
requireNumAtomic(2, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(2, 4))
})
}
}
2 changes: 1 addition & 1 deletion pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2705,7 +2705,7 @@ func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) {
ReplicationMode: base.ReplicationManual,
})
// Make a magical context which will allow us to use atomic replica changes.
ctx := context.WithValue(context.Background(), "testing", "testing")
ctx := client.ChangeReplicasCanMixAddAndRemoveContext(context.Background())
defer tc.Stopper().Stop(ctx)

// We want to first get into a joint consensus scenario.
Expand Down
Loading