Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: protect ComputeChecksum commands from replaying #29067

Merged
merged 1 commit into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
827 changes: 412 additions & 415 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -991,11 +991,7 @@ message ComputeChecksumRequest {
// The version used to pick the checksum method. It allows us to use a
// consistent checksumming method across replicas.
uint32 version = 2;
// A unique identifier to match a future storage.CollectChecksumRequest with
// this request.
bytes checksum_id = 3 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ChecksumID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
reserved 3;
// Compute a checksum along with a snapshot of the entire range, that will be
// used in logging a diff during checksum verification.
bool snapshot = 4;
Expand All @@ -1004,6 +1000,12 @@ message ComputeChecksumRequest {
// A ComputeChecksumResponse is the response to a ComputeChecksum() operation.
message ComputeChecksumResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// ChecksumID is the unique identifier that can be used to get the computed
// checksum in a future storage.CollectChecksumRequest.
bytes checksum_id = 2 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ChecksumID",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"];
}

enum ExportStorageProvider {
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ func (ba *BatchRequest) IsSingleSubsumeRequest() bool {
return false
}

// IsSingleComputeChecksumRequest returns true iff the batch contains a single
// request, and that request is a ComputeChecksumRequest.
func (ba *BatchRequest) IsSingleComputeChecksumRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*ComputeChecksumRequest)
return ok
}
return false
}

// GetPrevLeaseForLeaseRequest returns the previous lease, at the time
// of proposal, for a request lease or transfer lease request. If the
// batch does not contain a single lease request, this method will panic.
Expand Down
20 changes: 15 additions & 5 deletions pkg/storage/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func init() {
RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum)
}

// Version numbers for Replica checksum computation. Requests fail unless the
// versions are compatible.
// Version numbers for Replica checksum computation. Requests silently no-op
// unless the versions are compatible.
const (
ReplicaChecksumVersion = 2
ReplicaChecksumVersion = 3
ReplicaChecksumGCInterval = time.Hour
)

Expand All @@ -44,10 +46,18 @@ func ComputeChecksum(
args := cArgs.Args.(*roachpb.ComputeChecksumRequest)

if args.Version != ReplicaChecksumVersion {
log.Errorf(ctx, "Incompatible versions: e=%d, v=%d", ReplicaChecksumVersion, args.Version)
log.Infof(ctx, "incompatible ComputeChecksum versions (server: %d, requested: %d)",
ReplicaChecksumVersion, args.Version)
return result.Result{}, nil
}

reply := resp.(*roachpb.ComputeChecksumResponse)
reply.ChecksumID = uuid.MakeV4()

var pd result.Result
pd.Replicated.ComputeChecksum = args
pd.Replicated.ComputeChecksum = &storagebase.ComputeChecksum{
ChecksumID: reply.ChecksumID,
SaveSnapshot: args.Snapshot,
}
return pd, nil
}
71 changes: 71 additions & 0 deletions pkg/storage/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// TestConsistencyQueueRequiresLive verifies the queue will not
Expand Down Expand Up @@ -101,7 +104,75 @@ func TestCheckConsistencyMultiStore(t *testing.T) {
}, &checkArgs); err != nil {
t.Fatal(err)
}
}

// TestCheckConsistencyReplay verifies that two ComputeChecksum requests with
// the same checksum ID are not committed to the Raft log, even if DistSender
// retries the request.
func TestCheckConsistencyReplay(t *testing.T) {
defer leaktest.AfterTest(t)()

type applyKey struct {
checksumID uuid.UUID
storeID roachpb.StoreID
}
var state struct {
syncutil.Mutex
forcedRetry bool
applies map[applyKey]int
}
state.applies = map[applyKey]int{}

var mtc *multiTestContext
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil /* clock */)

// Arrange to count the number of times each checksum command applies to each
// store.
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
state.Lock()
defer state.Unlock()
if ccr := args.ComputeChecksum; ccr != nil {
state.applies[applyKey{ccr.ChecksumID, args.StoreID}]++
}
return nil
}

// Arrange to trigger a retry when a ComputeChecksum request arrives.
storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
state.Lock()
defer state.Unlock()
if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry {
state.forcedRetry = true
return roachpb.NewError(roachpb.NewSendError("injected failure"))
}
return nil
}

mtc = &multiTestContext{storeConfig: &storeCfg}
defer mtc.Stop()
mtc.Start(t, 2)

mtc.replicateRange(roachpb.RangeID(1), 1)

checkArgs := roachpb.CheckConsistencyRequest{
RequestHeader: roachpb.RequestHeader{
Key: []byte("a"),
EndKey: []byte("b"),
},
}
if _, err := client.SendWrapped(ctx, mtc.Store(0).TestSender(), &checkArgs); err != nil {
t.Fatal(err)
}

state.Lock()
defer state.Unlock()
for applyKey, count := range state.applies {
if count != 1 {
t.Errorf("checksum %s was applied %d times to s%d (expected once)",
applyKey.checksumID, count, applyKey.storeID)
}
}
}

func TestCheckConsistencyInconsistent(t *testing.T) {
Expand Down
19 changes: 7 additions & 12 deletions pkg/storage/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,14 @@ func (r *Replica) CheckConsistency(
key = keys.LocalMax
}
endKey := desc.EndKey.AsRawKey()
id := uuid.MakeV4()

checkArgs := roachpb.ComputeChecksumRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
EndKey: endKey,
},
Version: batcheval.ReplicaChecksumVersion,
ChecksumID: id,
Snapshot: args.WithDiff,
Version: batcheval.ReplicaChecksumVersion,
Snapshot: args.WithDiff,
}

results, err := r.RunConsistencyCheck(ctx, checkArgs)
Expand Down Expand Up @@ -205,14 +203,11 @@ func (r *Replica) RunConsistencyCheck(
) ([]ConsistencyCheckResult, error) {
// Send a ComputeChecksum which will trigger computation of the checksum on
// all replicas.
{
var b client.Batch
b.AddRawRequest(&req)

if err := r.store.db.Run(ctx, &b); err != nil {
return nil, err
}
res, pErr := client.SendWrapped(ctx, r.store.db.NonTransactionalSender(), &req)
if pErr != nil {
return nil, pErr.GoError()
}
ccRes := res.(*roachpb.ComputeChecksumResponse)

var orderedReplicas []roachpb.ReplicaDescriptor
{
Expand Down Expand Up @@ -249,7 +244,7 @@ func (r *Replica) RunConsistencyCheck(
if len(results) > 0 {
masterChecksum = results[0].Response.Checksum
}
resp, err := r.collectChecksumFromReplica(ctx, replica, req.ChecksumID, masterChecksum)
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
Expand Down
22 changes: 9 additions & 13 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,26 @@ func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) {
}
}

func (r *Replica) computeChecksumPostApply(
ctx context.Context, args roachpb.ComputeChecksumRequest,
) {
func (r *Replica) computeChecksumPostApply(ctx context.Context, cc storagebase.ComputeChecksum) {
stopper := r.store.Stopper()
id := args.ChecksumID
now := timeutil.Now()
r.mu.Lock()
var notify chan struct{}
if c, ok := r.mu.checksums[id]; !ok {
if c, ok := r.mu.checksums[cc.ChecksumID]; !ok {
// There is no record of this ID. Make a new notification.
notify = make(chan struct{})
} else if !c.started {
// A CollectChecksumRequest is waiting on the existing notification.
notify = c.notify
} else {
// A previous attempt was made to compute the checksum.
r.mu.Unlock()
return
log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s",
cc.ChecksumID)
}

r.gcOldChecksumEntriesLocked(now)

// Create an entry with checksum == nil and gcTimestamp unset.
r.mu.checksums[id] = ReplicaChecksum{started: true, notify: notify}
r.mu.checksums[cc.ChecksumID] = ReplicaChecksum{started: true, notify: notify}
desc := *r.mu.state.Desc
r.mu.Unlock()
// Caller is holding raftMu, so an engine snapshot is automatically
Expand All @@ -175,20 +171,20 @@ func (r *Replica) computeChecksumPostApply(
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
if args.Snapshot {
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
result, err := r.sha512(ctx, desc, snap, snapshot)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
}
r.computeChecksumDone(ctx, id, result, snapshot)
r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot)
}); err != nil {
defer snap.Close()
log.Error(ctx, errors.Wrapf(err, "could not run async checksum computation (ID = %s)", id))
log.Error(ctx, errors.Wrapf(err, "could not run async checksum computation (ID = %s)", cc.ChecksumID))
// Set checksum to nil.
r.computeChecksumDone(ctx, id, nil, nil)
r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil)
}
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7604,17 +7604,15 @@ func TestComputeChecksumVersioning(t *testing.T) {

if pct, _ := batcheval.ComputeChecksum(context.TODO(), nil,
batcheval.CommandArgs{Args: &roachpb.ComputeChecksumRequest{
ChecksumID: uuid.MakeV4(),
Version: batcheval.ReplicaChecksumVersion,
Version: batcheval.ReplicaChecksumVersion,
}}, &roachpb.ComputeChecksumResponse{},
); pct.Replicated.ComputeChecksum == nil {
t.Error("right checksum version: expected post-commit trigger")
}

if pct, _ := batcheval.ComputeChecksum(context.TODO(), nil,
batcheval.CommandArgs{Args: &roachpb.ComputeChecksumRequest{
ChecksumID: uuid.MakeV4(),
Version: batcheval.ReplicaChecksumVersion + 1,
Version: batcheval.ReplicaChecksumVersion + 1,
}}, &roachpb.ComputeChecksumResponse{},
); pct.Replicated.ComputeChecksum != nil {
t.Errorf("wrong checksum version: expected no post-commit trigger: %s", pct.Replicated.ComputeChecksum)
Expand Down
Loading