Skip to content

Commit 7aec72b

Browse files
craig[bot]benesch
craig[bot]
andcommitted
Merge #29126
29126: backport-2.1: another round of merge bug fixes r=tschottdorf a=benesch Backport: * 1/1 commits from "storage: fix raft snapshots that span merges and splits" (#29083) * 1/1 commits from "storage: deflake TestStoreRangeMergeReadoptedBothFollowers" (#29084) * 1/1 commits from "storage: protect ComputeChecksum commands from replaying" (#29067) * 1/1 commits from "storage: make ComputeChecksum a point request" (#29079) Please see individual PRs for details. /cc @cockroachdb/release Co-authored-by: Nikhil Benesch <[email protected]>
2 parents a097712 + f315595 commit 7aec72b

16 files changed

+991
-704
lines changed

pkg/internal/client/batch.go

-18
Original file line numberDiff line numberDiff line change
@@ -516,24 +516,6 @@ func (b *Batch) ReverseScan(s, e interface{}) {
516516
b.scan(s, e, true)
517517
}
518518

519-
// CheckConsistency creates a batch request to check the consistency of the
520-
// ranges holding the span of keys from s to e. It logs a diff of all the
521-
// keys that are inconsistent when withDiff is set to true.
522-
func (b *Batch) CheckConsistency(s, e interface{}, withDiff bool) {
523-
begin, err := marshalKey(s)
524-
if err != nil {
525-
b.initResult(0, 0, notRaw, err)
526-
return
527-
}
528-
end, err := marshalKey(e)
529-
if err != nil {
530-
b.initResult(0, 0, notRaw, err)
531-
return
532-
}
533-
b.appendReqs(roachpb.NewCheckConsistency(begin, end, withDiff))
534-
b.initResult(1, 0, notRaw, nil)
535-
}
536-
537519
// Del deletes one or more keys.
538520
//
539521
// A new result will be appended to the batch and each key will have a

pkg/internal/client/db.go

-9
Original file line numberDiff line numberDiff line change
@@ -506,15 +506,6 @@ func (db *DB) AdminChangeReplicas(
506506
return getOneErr(db.Run(ctx, b), b)
507507
}
508508

509-
// CheckConsistency runs a consistency check on all the ranges containing
510-
// the key span. It logs a diff of all the keys that are inconsistent
511-
// when withDiff is set to true.
512-
func (db *DB) CheckConsistency(ctx context.Context, begin, end interface{}, withDiff bool) error {
513-
b := &Batch{}
514-
b.CheckConsistency(begin, end, withDiff)
515-
return getOneErr(db.Run(ctx, b), b)
516-
}
517-
518509
// WriteBatch applies the operations encoded in a BatchRepr, which is the
519510
// serialized form of a RocksDB Batch. The command cannot span Ranges and must
520511
// be run on an empty keyrange.

pkg/roachpb/api.go

+1-12
Original file line numberDiff line numberDiff line change
@@ -930,17 +930,6 @@ func NewScan(key, endKey Key) Request {
930930
}
931931
}
932932

933-
// NewCheckConsistency returns a Request initialized to scan from start to end keys.
934-
func NewCheckConsistency(key, endKey Key, withDiff bool) Request {
935-
return &CheckConsistencyRequest{
936-
RequestHeader: RequestHeader{
937-
Key: key,
938-
EndKey: endKey,
939-
},
940-
WithDiff: withDiff,
941-
}
942-
}
943-
944933
// NewReverseScan returns a Request initialized to reverse scan from end to
945934
// start keys with max results.
946935
func NewReverseScan(key, endKey Key) Request {
@@ -1064,7 +1053,7 @@ func (*TransferLeaseRequest) flags() int {
10641053
return isWrite | isAlone | skipLeaseCheck
10651054
}
10661055
func (*RecomputeStatsRequest) flags() int { return isWrite | isAlone }
1067-
func (*ComputeChecksumRequest) flags() int { return isWrite | isRange }
1056+
func (*ComputeChecksumRequest) flags() int { return isWrite }
10681057
func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
10691058
func (*WriteBatchRequest) flags() int { return isWrite | isRange }
10701059
func (*ExportRequest) flags() int { return isRead | isRange | updatesReadTSCache }

pkg/roachpb/api.pb.go

+412-415
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/roachpb/api.proto

+7-5
Original file line numberDiff line numberDiff line change
@@ -991,11 +991,7 @@ message ComputeChecksumRequest {
991991
// The version used to pick the checksum method. It allows us to use a
992992
// consistent checksumming method across replicas.
993993
uint32 version = 2;
994-
// A unique identifier to match a future storage.CollectChecksumRequest with
995-
// this request.
996-
bytes checksum_id = 3 [(gogoproto.nullable) = false,
997-
(gogoproto.customname) = "ChecksumID",
998-
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
994+
reserved 3;
999995
// Compute a checksum along with a snapshot of the entire range, that will be
1000996
// used in logging a diff during checksum verification.
1001997
bool snapshot = 4;
@@ -1004,6 +1000,12 @@ message ComputeChecksumRequest {
10041000
// A ComputeChecksumResponse is the response to a ComputeChecksum() operation.
10051001
message ComputeChecksumResponse {
10061002
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
1003+
1004+
// ChecksumID is the unique identifier that can be used to get the computed
1005+
// checksum in a future storage.CollectChecksumRequest.
1006+
bytes checksum_id = 2 [(gogoproto.nullable) = false,
1007+
(gogoproto.customname) = "ChecksumID",
1008+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
10071009
}
10081010

10091011
enum ExportStorageProvider {

pkg/roachpb/batch.go

+10
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,16 @@ func (ba *BatchRequest) IsSingleSubsumeRequest() bool {
188188
return false
189189
}
190190

191+
// IsSingleComputeChecksumRequest returns true iff the batch contains a single
192+
// request, and that request is a ComputeChecksumRequest.
193+
func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
194+
if ba.IsSingleRequest() {
195+
_, ok := ba.Requests[0].GetInner().(*ComputeChecksumRequest)
196+
return ok
197+
}
198+
return false
199+
}
200+
191201
// GetPrevLeaseForLeaseRequest returns the previous lease, at the time
192202
// of proposal, for a request lease or transfer lease request. If the
193203
// batch does not contain a single lease request, this method will panic.

pkg/storage/batcheval/cmd_compute_checksum.go

+25-6
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,28 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/roachpb"
2222
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
2323
"github.com/cockroachdb/cockroach/pkg/storage/engine"
24+
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
25+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
2426
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2528
)
2629

2730
func init() {
28-
RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum)
31+
RegisterCommand(roachpb.ComputeChecksum, declareKeysComputeChecksum, ComputeChecksum)
2932
}
3033

31-
// Version numbers for Replica checksum computation. Requests fail unless the
32-
// versions are compatible.
34+
func declareKeysComputeChecksum(
35+
roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet,
36+
) {
37+
// Intentionally declare no keys, as ComputeChecksum does not need to be
38+
// serialized with any other commands. It simply needs to be committed into
39+
// the Raft log.
40+
}
41+
42+
// Version numbers for Replica checksum computation. Requests silently no-op
43+
// unless the versions are compatible.
3344
const (
34-
ReplicaChecksumVersion = 2
45+
ReplicaChecksumVersion = 3
3546
ReplicaChecksumGCInterval = time.Hour
3647
)
3748

@@ -44,10 +55,18 @@ func ComputeChecksum(
4455
args := cArgs.Args.(*roachpb.ComputeChecksumRequest)
4556

4657
if args.Version != ReplicaChecksumVersion {
47-
log.Errorf(ctx, "Incompatible versions: e=%d, v=%d", ReplicaChecksumVersion, args.Version)
58+
log.Infof(ctx, "incompatible ComputeChecksum versions (server: %d, requested: %d)",
59+
ReplicaChecksumVersion, args.Version)
4860
return result.Result{}, nil
4961
}
62+
63+
reply := resp.(*roachpb.ComputeChecksumResponse)
64+
reply.ChecksumID = uuid.MakeV4()
65+
5066
var pd result.Result
51-
pd.Replicated.ComputeChecksum = args
67+
pd.Replicated.ComputeChecksum = &storagebase.ComputeChecksum{
68+
ChecksumID: reply.ChecksumID,
69+
SaveSnapshot: args.Snapshot,
70+
}
5271
return pd, nil
5372
}

pkg/storage/client_merge_test.go

+46-25
Original file line numberDiff line numberDiff line change
@@ -1727,6 +1727,7 @@ func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
17271727
mtc.Start(t, 3)
17281728
defer mtc.Stop()
17291729
store0, store2 := mtc.Store(0), mtc.Store(2)
1730+
distSender := mtc.distSenders[0]
17301731

17311732
// Create two ranges on all nodes.
17321733
mtc.replicateRange(roachpb.RangeID(1), 1, 2)
@@ -1735,16 +1736,26 @@ func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
17351736
t.Fatal(err)
17361737
}
17371738

1738-
// Wait for store2 to hear about the split.
1739-
var lhsRepl2, rhsRepl2 *storage.Replica
1740-
testutils.SucceedsSoon(t, func() error {
1741-
lhsRepl2, err = store2.GetReplica(lhsDesc.RangeID)
1742-
if err != nil {
1743-
return err
1739+
// Wait for all stores to have fully processed the split.
1740+
for _, key := range []roachpb.Key{roachpb.Key("a"), roachpb.Key("b")} {
1741+
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
1742+
t.Fatal(pErr)
17441743
}
1745-
rhsRepl2, err = store2.GetReplica(rhsDesc.RangeID)
1746-
return err
1747-
})
1744+
mtc.waitForValues(key, []int64{1, 1, 1})
1745+
}
1746+
1747+
lhsRepl0, err := store0.GetReplica(lhsDesc.RangeID)
1748+
if err != nil {
1749+
t.Fatal(err)
1750+
}
1751+
lhsRepl2, err := store2.GetReplica(lhsDesc.RangeID)
1752+
if err != nil {
1753+
t.Fatal(err)
1754+
}
1755+
rhsRepl2, err := store2.GetReplica(rhsDesc.RangeID)
1756+
if err != nil {
1757+
t.Fatal(err)
1758+
}
17481759

17491760
// Abandon the two ranges on store2, but do not GC them.
17501761
mtc.unreplicateRange(lhsDesc.RangeID, 2)
@@ -1757,13 +1768,6 @@ func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
17571768
t.Fatal(pErr)
17581769
}
17591770

1760-
// Attempt to re-add the merged range to store2. The operation should fail
1761-
// because store2's LHS and RHS replicas intersect the merged range.
1762-
lhsRepl0, err := store0.GetReplica(lhsDesc.RangeID)
1763-
if err != nil {
1764-
t.Fatal(err)
1765-
}
1766-
17671771
addLHSRepl2 := func() error {
17681772
for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
17691773
err := lhsRepl0.ChangeReplicas(ctx, roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
@@ -1778,6 +1782,8 @@ func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
17781782
return nil
17791783
}
17801784

1785+
// Attempt to re-add the merged range to store2. The operation should fail
1786+
// because store2's LHS and RHS replicas intersect the merged range.
17811787
err = addLHSRepl2()
17821788
if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
17831789
t.Fatalf("expected %q error, but got %v", exp, err)
@@ -1903,22 +1909,25 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
19031909

19041910
// Create three fully-caught-up, adjacent ranges on all three stores.
19051911
mtc.replicateRange(roachpb.RangeID(1), 1, 2)
1906-
splitKeys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")}
1907-
for _, key := range splitKeys {
1908-
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(key)); pErr != nil {
1909-
t.Fatal(pErr)
1912+
splitKeys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c"), roachpb.Key("d")}
1913+
for i, key := range splitKeys {
1914+
if i != len(splitKeys)-1 {
1915+
// We'll split the last range off later.
1916+
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(key)); pErr != nil {
1917+
t.Fatal(pErr)
1918+
}
19101919
}
19111920
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
19121921
t.Fatal(pErr)
19131922
}
19141923
mtc.waitForValues(key, []int64{1, 1, 1})
19151924
}
19161925

1917-
lhsRepl0 := store0.LookupReplica(roachpb.RKey("a"))
1926+
aRepl0 := store0.LookupReplica(roachpb.RKey("a"))
19181927

19191928
// Start dropping all Raft traffic to the first range on store1.
19201929
mtc.transport.Listen(store2.Ident.StoreID, &unreliableRaftHandler{
1921-
rangeID: lhsRepl0.RangeID,
1930+
rangeID: aRepl0.RangeID,
19221931
RaftMessageHandler: store2,
19231932
})
19241933

@@ -1929,6 +1938,13 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
19291938
}
19301939
}
19311940

1941+
// Split [a, /Max) into [a, d) and [d, /Max). This means the Raft snapshot
1942+
// will span both a merge and a split.
1943+
lastSplitKey := splitKeys[len(splitKeys)-1]
1944+
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(lastSplitKey)); pErr != nil {
1945+
t.Fatal(pErr)
1946+
}
1947+
19321948
// Truncate the logs of the LHS.
19331949
{
19341950
repl := store0.LookupReplica(roachpb.RKey("a"))
@@ -1970,10 +1986,15 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
19701986
// Verify that the sets of keys in store0 and store2 are identical.
19711987
storeKeys0 := getEngineKeySet(t, store0.Engine())
19721988
storeKeys2 := getEngineKeySet(t, store2.Engine())
1989+
dRepl0 := store0.LookupReplica(roachpb.RKey("d"))
19731990
ignoreKey := func(k string) bool {
1974-
// Unreplicated keys for the two remaining ranges are allowed to differ.
1975-
return strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(roachpb.RangeID(1)))) ||
1976-
strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(lhsRepl0.RangeID)))
1991+
// Unreplicated keys for the remaining ranges are allowed to differ.
1992+
for _, id := range []roachpb.RangeID{1, aRepl0.RangeID, dRepl0.RangeID} {
1993+
if strings.HasPrefix(k, string(keys.MakeRangeIDUnreplicatedPrefix(id))) {
1994+
return true
1995+
}
1996+
}
1997+
return false
19771998
}
19781999
for k := range storeKeys0 {
19792000
if ignoreKey(k) {

pkg/storage/consistency_queue_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/storage/engine"
3333
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
3434
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
35+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
3536
"github.com/cockroachdb/cockroach/pkg/testutils"
3637
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
3738
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3839
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3940
"github.com/cockroachdb/cockroach/pkg/util/log"
41+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
42+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4043
)
4144

4245
// TestConsistencyQueueRequiresLive verifies the queue will not
@@ -101,7 +104,75 @@ func TestCheckConsistencyMultiStore(t *testing.T) {
101104
}, &checkArgs); err != nil {
102105
t.Fatal(err)
103106
}
107+
}
108+
109+
// TestCheckConsistencyReplay verifies that two ComputeChecksum requests with
110+
// the same checksum ID are not committed to the Raft log, even if DistSender
111+
// retries the request.
112+
func TestCheckConsistencyReplay(t *testing.T) {
113+
defer leaktest.AfterTest(t)()
114+
115+
type applyKey struct {
116+
checksumID uuid.UUID
117+
storeID roachpb.StoreID
118+
}
119+
var state struct {
120+
syncutil.Mutex
121+
forcedRetry bool
122+
applies map[applyKey]int
123+
}
124+
state.applies = map[applyKey]int{}
125+
126+
var mtc *multiTestContext
127+
ctx := context.Background()
128+
storeCfg := storage.TestStoreConfig(nil /* clock */)
129+
130+
// Arrange to count the number of times each checksum command applies to each
131+
// store.
132+
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
133+
state.Lock()
134+
defer state.Unlock()
135+
if ccr := args.ComputeChecksum; ccr != nil {
136+
state.applies[applyKey{ccr.ChecksumID, args.StoreID}]++
137+
}
138+
return nil
139+
}
140+
141+
// Arrange to trigger a retry when a ComputeChecksum request arrives.
142+
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
143+
state.Lock()
144+
defer state.Unlock()
145+
if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry {
146+
state.forcedRetry = true
147+
return roachpb.NewError(roachpb.NewSendError("injected failure"))
148+
}
149+
return nil
150+
}
151+
152+
mtc = &multiTestContext{storeConfig: &storeCfg}
153+
defer mtc.Stop()
154+
mtc.Start(t, 2)
155+
156+
mtc.replicateRange(roachpb.RangeID(1), 1)
157+
158+
checkArgs := roachpb.CheckConsistencyRequest{
159+
RequestHeader: roachpb.RequestHeader{
160+
Key: []byte("a"),
161+
EndKey: []byte("b"),
162+
},
163+
}
164+
if _, err := client.SendWrapped(ctx, mtc.Store(0).TestSender(), &checkArgs); err != nil {
165+
t.Fatal(err)
166+
}
104167

168+
state.Lock()
169+
defer state.Unlock()
170+
for applyKey, count := range state.applies {
171+
if count != 1 {
172+
t.Errorf("checksum %s was applied %d times to s%d (expected once)",
173+
applyKey.checksumID, count, applyKey.storeID)
174+
}
175+
}
105176
}
106177

107178
func TestCheckConsistencyInconsistent(t *testing.T) {

0 commit comments

Comments
 (0)