Skip to content

Commit c952cb3

Browse files
committed
storage: protect ComputeChecksum commands from replaying
Previously, a ComputeChecksum command could apply twice with the same ID. Consider the following sequence of events: 1. DistSender sends a ComputeChecksum request to a replica. 2. The request is succesfully evaluated and proposed, but a connection error occurs. 3. DistSender retries the request, leaving the checksum ID unchanged! This would result in two ComputeChecksum commands with the same checksum ID in the Raft log. Somewhat amazingly, this typically wasn't problematic. If all replicas were online and reasonably up-to-date, they'd see the first ComputeChecksum command, compute its checksum, and store it in the checksums map. When they saw the duplicated ComputeChecksum command, they'd see that a checksum with that ID already existed and ignore it. In effect, only the first ComputeChecksum command for a given checksum ID mattered. The problem occured when one replica saw one ComputeChecksum command but not the other. There were two ways this could occur. A replica could go offline after computing the checksum the first time; when it came back online, it would have an empty checksum map, and the checksum computed for the second ComputeChecksum command would be recorded instead. Or a replica could receive a snapshot that advanced it past one ComputeChecksum but not the other. In both cases, the replicas could spuriously fail a consistency check. A very similar problem occured with range merges because ComputeChecksum requests are incorrectly ranged (see #29002). That means DistSender might split a ComputeChecksum request in two. Consider what happens when a consistency check occurs immediately after a merge: the ComputeChecksum request is generated using the up-to-date, post-merge descriptor, but DistSender might have the pre-merge descriptors cached, and so it splits the batch in two. Both halves of the batch would get routed to the same range, and both halves would have the same command ID, resulting in the same duplicated ComputeChecksum command problem. The fix for these problems is to assign the checksum ID when the ComputeChecksum request is evaluated. If the request is retried, it will be properly assigned a new checksum ID. Note that we don't need to worry about reproposals causing duplicate commands, as the MaxLeaseIndex prevents proposals from replay. The version compatibility story here is straightforward. The ReplicaChecksumVersion is bumped, so v2.0 nodes will turn ComputeChecksum requests proposed by v2.1 nodes into a no-op, and vice-versa. The consistency queue will spam some complaints into the log about this--it will time out while collecting checksums--but this will stop as soon as all nodes have been upgraded to the new version.† Note that this commit takes the opportunity to migrate storagebase.ReplicatedEvalResult.ComputeChecksum from roachpb.ComputeChecksumRequest to a dedicated storagebase.ComputeChecksum message. Separate types are more in line with how the merge/split/change replicas triggers work and avoid shipping unnecessary fields through Raft. Note that even though this migration changes logic downstream of Raft, it's safe. v2.1 nodes will turn any ComputeChecksum commands that were commited by v2.0 nodes into no-ops, and vice-versa, but the only effect of this will be some temporary consistency queue spam. As an added bonus, because we're guaranteed that we'll never see duplicate v2.1-style ComputeChecksum commands, we can properly fatal if we ever see a ComputeChecksum request with a checksum ID that we've already computed. † It would be possible to put the late-ID allocation behind a cluster version to avoid the log spam, but that amounts to allowing v2.1 to initiate known-buggy consistency checks. A bit of log spam seems preferable. Fix #28995.
1 parent 9ee43b3 commit c952cb3

11 files changed

+915
-617
lines changed

pkg/roachpb/api.pb.go

+412-415
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/roachpb/api.proto

+7-5
Original file line numberDiff line numberDiff line change
@@ -991,11 +991,7 @@ message ComputeChecksumRequest {
991991
// The version used to pick the checksum method. It allows us to use a
992992
// consistent checksumming method across replicas.
993993
uint32 version = 2;
994-
// A unique identifier to match a future storage.CollectChecksumRequest with
995-
// this request.
996-
bytes checksum_id = 3 [(gogoproto.nullable) = false,
997-
(gogoproto.customname) = "ChecksumID",
998-
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
994+
reserved 3;
999995
// Compute a checksum along with a snapshot of the entire range, that will be
1000996
// used in logging a diff during checksum verification.
1001997
bool snapshot = 4;
@@ -1004,6 +1000,12 @@ message ComputeChecksumRequest {
10041000
// A ComputeChecksumResponse is the response to a ComputeChecksum() operation.
10051001
message ComputeChecksumResponse {
10061002
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
1003+
1004+
// ChecksumID is the unique identifier that can be used to get the computed
1005+
// checksum in a future storage.CollectChecksumRequest.
1006+
bytes checksum_id = 2 [(gogoproto.nullable) = false,
1007+
(gogoproto.customname) = "ChecksumID",
1008+
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
10071009
}
10081010

10091011
enum ExportStorageProvider {

pkg/roachpb/batch.go

+10
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,16 @@ func (ba *BatchRequest) IsSingleSubsumeRequest() bool {
188188
return false
189189
}
190190

191+
// IsSingleComputeChecksumRequest returns true iff the batch contains a single
192+
// request, and that request is a ComputeChecksumRequest.
193+
func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
194+
if ba.IsSingleRequest() {
195+
_, ok := ba.Requests[0].GetInner().(*ComputeChecksumRequest)
196+
return ok
197+
}
198+
return false
199+
}
200+
191201
// GetPrevLeaseForLeaseRequest returns the previous lease, at the time
192202
// of proposal, for a request lease or transfer lease request. If the
193203
// batch does not contain a single lease request, this method will panic.

pkg/storage/batcheval/cmd_compute_checksum.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/roachpb"
2222
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
2323
"github.com/cockroachdb/cockroach/pkg/storage/engine"
24+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
26+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2527
)
2628

2729
func init() {
2830
RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum)
2931
}
3032

31-
// Version numbers for Replica checksum computation. Requests fail unless the
32-
// versions are compatible.
33+
// Version numbers for Replica checksum computation. Requests silently no-op
34+
// unless the versions are compatible.
3335
const (
34-
ReplicaChecksumVersion = 2
36+
ReplicaChecksumVersion = 3
3537
ReplicaChecksumGCInterval = time.Hour
3638
)
3739

@@ -44,10 +46,18 @@ func ComputeChecksum(
4446
args := cArgs.Args.(*roachpb.ComputeChecksumRequest)
4547

4648
if args.Version != ReplicaChecksumVersion {
47-
log.Errorf(ctx, "Incompatible versions: e=%d, v=%d", ReplicaChecksumVersion, args.Version)
49+
log.Infof(ctx, "incompatible ComputeChecksum versions (server: %d, requested: %d)",
50+
ReplicaChecksumVersion, args.Version)
4851
return result.Result{}, nil
4952
}
53+
54+
reply := resp.(*roachpb.ComputeChecksumResponse)
55+
reply.ChecksumID = uuid.MakeV4()
56+
5057
var pd result.Result
51-
pd.Replicated.ComputeChecksum = args
58+
pd.Replicated.ComputeChecksum = &storagebase.ComputeChecksum{
59+
ChecksumID: reply.ChecksumID,
60+
SaveSnapshot: args.Snapshot,
61+
}
5262
return pd, nil
5363
}

pkg/storage/consistency_queue_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/storage/engine"
3333
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
3434
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
35+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
3536
"github.com/cockroachdb/cockroach/pkg/testutils"
3637
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
3738
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3839
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3940
"github.com/cockroachdb/cockroach/pkg/util/log"
41+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
42+
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4043
)
4144

4245
// TestConsistencyQueueRequiresLive verifies the queue will not
@@ -101,7 +104,75 @@ func TestCheckConsistencyMultiStore(t *testing.T) {
101104
}, &checkArgs); err != nil {
102105
t.Fatal(err)
103106
}
107+
}
108+
109+
// TestCheckConsistencyReplay verifies that two ComputeChecksum requests with
110+
// the same checksum ID are not committed to the Raft log, even if DistSender
111+
// retries the request.
112+
func TestCheckConsistencyReplay(t *testing.T) {
113+
defer leaktest.AfterTest(t)()
114+
115+
type applyKey struct {
116+
checksumID uuid.UUID
117+
storeID roachpb.StoreID
118+
}
119+
var state struct {
120+
syncutil.Mutex
121+
forcedRetry bool
122+
applies map[applyKey]int
123+
}
124+
state.applies = map[applyKey]int{}
125+
126+
var mtc *multiTestContext
127+
ctx := context.Background()
128+
storeCfg := storage.TestStoreConfig(nil /* clock */)
129+
130+
// Arrange to count the number of times each checksum command applies to each
131+
// store.
132+
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
133+
state.Lock()
134+
defer state.Unlock()
135+
if ccr := args.ComputeChecksum; ccr != nil {
136+
state.applies[applyKey{ccr.ChecksumID, args.StoreID}]++
137+
}
138+
return nil
139+
}
140+
141+
// Arrange to trigger a retry when a ComputeChecksum request arrives.
142+
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
143+
state.Lock()
144+
defer state.Unlock()
145+
if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry {
146+
state.forcedRetry = true
147+
return roachpb.NewError(roachpb.NewSendError("injected failure"))
148+
}
149+
return nil
150+
}
151+
152+
mtc = &multiTestContext{storeConfig: &storeCfg}
153+
defer mtc.Stop()
154+
mtc.Start(t, 2)
155+
156+
mtc.replicateRange(roachpb.RangeID(1), 1)
157+
158+
checkArgs := roachpb.CheckConsistencyRequest{
159+
RequestHeader: roachpb.RequestHeader{
160+
Key: []byte("a"),
161+
EndKey: []byte("b"),
162+
},
163+
}
164+
if _, err := client.SendWrapped(ctx, mtc.Store(0).TestSender(), &checkArgs); err != nil {
165+
t.Fatal(err)
166+
}
104167

168+
state.Lock()
169+
defer state.Unlock()
170+
for applyKey, count := range state.applies {
171+
if count != 1 {
172+
t.Errorf("checksum %s was applied %d times to s%d (expected once)",
173+
applyKey.checksumID, count, applyKey.storeID)
174+
}
175+
}
105176
}
106177

107178
func TestCheckConsistencyInconsistent(t *testing.T) {

pkg/storage/replica_consistency.go

+7-12
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,14 @@ func (r *Replica) CheckConsistency(
6767
key = keys.LocalMax
6868
}
6969
endKey := desc.EndKey.AsRawKey()
70-
id := uuid.MakeV4()
7170

7271
checkArgs := roachpb.ComputeChecksumRequest{
7372
RequestHeader: roachpb.RequestHeader{
7473
Key: key,
7574
EndKey: endKey,
7675
},
77-
Version: batcheval.ReplicaChecksumVersion,
78-
ChecksumID: id,
79-
Snapshot: args.WithDiff,
76+
Version: batcheval.ReplicaChecksumVersion,
77+
Snapshot: args.WithDiff,
8078
}
8179

8280
results, err := r.RunConsistencyCheck(ctx, checkArgs)
@@ -205,14 +203,11 @@ func (r *Replica) RunConsistencyCheck(
205203
) ([]ConsistencyCheckResult, error) {
206204
// Send a ComputeChecksum which will trigger computation of the checksum on
207205
// all replicas.
208-
{
209-
var b client.Batch
210-
b.AddRawRequest(&req)
211-
212-
if err := r.store.db.Run(ctx, &b); err != nil {
213-
return nil, err
214-
}
206+
res, pErr := client.SendWrapped(ctx, r.store.db.NonTransactionalSender(), &req)
207+
if pErr != nil {
208+
return nil, pErr.GoError()
215209
}
210+
ccRes := res.(*roachpb.ComputeChecksumResponse)
216211

217212
var orderedReplicas []roachpb.ReplicaDescriptor
218213
{
@@ -249,7 +244,7 @@ func (r *Replica) RunConsistencyCheck(
249244
if len(results) > 0 {
250245
masterChecksum = results[0].Response.Checksum
251246
}
252-
resp, err := r.collectChecksumFromReplica(ctx, replica, req.ChecksumID, masterChecksum)
247+
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
253248
resultCh <- ConsistencyCheckResult{
254249
Replica: replica,
255250
Response: resp,

pkg/storage/replica_proposal.go

+9-13
Original file line numberDiff line numberDiff line change
@@ -141,30 +141,26 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) {
141141
}
142142
}
143143

144-
func (r *Replica) computeChecksumPostApply(
145-
ctx context.Context, args roachpb.ComputeChecksumRequest,
146-
) {
144+
func (r *Replica) computeChecksumPostApply(ctx context.Context, cc storagebase.ComputeChecksum) {
147145
stopper := r.store.Stopper()
148-
id := args.ChecksumID
149146
now := timeutil.Now()
150147
r.mu.Lock()
151148
var notify chan struct{}
152-
if c, ok := r.mu.checksums[id]; !ok {
149+
if c, ok := r.mu.checksums[cc.ChecksumID]; !ok {
153150
// There is no record of this ID. Make a new notification.
154151
notify = make(chan struct{})
155152
} else if !c.started {
156153
// A CollectChecksumRequest is waiting on the existing notification.
157154
notify = c.notify
158155
} else {
159-
// A previous attempt was made to compute the checksum.
160-
r.mu.Unlock()
161-
return
156+
log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s",
157+
cc.ChecksumID)
162158
}
163159

164160
r.gcOldChecksumEntriesLocked(now)
165161

166162
// Create an entry with checksum == nil and gcTimestamp unset.
167-
r.mu.checksums[id] = ReplicaChecksum{started: true, notify: notify}
163+
r.mu.checksums[cc.ChecksumID] = ReplicaChecksum{started: true, notify: notify}
168164
desc := *r.mu.state.Desc
169165
r.mu.Unlock()
170166
// Caller is holding raftMu, so an engine snapshot is automatically
@@ -175,20 +171,20 @@ func (r *Replica) computeChecksumPostApply(
175171
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
176172
defer snap.Close()
177173
var snapshot *roachpb.RaftSnapshotData
178-
if args.Snapshot {
174+
if cc.SaveSnapshot {
179175
snapshot = &roachpb.RaftSnapshotData{}
180176
}
181177
result, err := r.sha512(ctx, desc, snap, snapshot)
182178
if err != nil {
183179
log.Errorf(ctx, "%v", err)
184180
result = nil
185181
}
186-
r.computeChecksumDone(ctx, id, result, snapshot)
182+
r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot)
187183
}); err != nil {
188184
defer snap.Close()
189-
log.Error(ctx, errors.Wrapf(err, "could not run async checksum computation (ID = %s)", id))
185+
log.Error(ctx, errors.Wrapf(err, "could not run async checksum computation (ID = %s)", cc.ChecksumID))
190186
// Set checksum to nil.
191-
r.computeChecksumDone(ctx, id, nil, nil)
187+
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
192188
}
193189
}
194190

pkg/storage/replica_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -7604,17 +7604,15 @@ func TestComputeChecksumVersioning(t *testing.T) {
76047604

76057605
if pct, _ := batcheval.ComputeChecksum(context.TODO(), nil,
76067606
batcheval.CommandArgs{Args: &roachpb.ComputeChecksumRequest{
7607-
ChecksumID: uuid.MakeV4(),
7608-
Version: batcheval.ReplicaChecksumVersion,
7607+
Version: batcheval.ReplicaChecksumVersion,
76097608
}}, &roachpb.ComputeChecksumResponse{},
76107609
); pct.Replicated.ComputeChecksum == nil {
76117610
t.Error("right checksum version: expected post-commit trigger")
76127611
}
76137612

76147613
if pct, _ := batcheval.ComputeChecksum(context.TODO(), nil,
76157614
batcheval.CommandArgs{Args: &roachpb.ComputeChecksumRequest{
7616-
ChecksumID: uuid.MakeV4(),
7617-
Version: batcheval.ReplicaChecksumVersion + 1,
7615+
Version: batcheval.ReplicaChecksumVersion + 1,
76187616
}}, &roachpb.ComputeChecksumResponse{},
76197617
); pct.Replicated.ComputeChecksum != nil {
76207618
t.Errorf("wrong checksum version: expected no post-commit trigger: %s", pct.Replicated.ComputeChecksum)

0 commit comments

Comments
 (0)