Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,roachpb,storage: add IsSpanEmptyRequest to check for any data #85798

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (ds *DistSender) initAndVerifyBatch(
switch inner.(type) {
case *roachpb.ScanRequest, *roachpb.ResolveIntentRangeRequest,
*roachpb.DeleteRangeRequest, *roachpb.RevertRangeRequest,
*roachpb.ExportRequest, *roachpb.QueryLocksRequest:
*roachpb.ExportRequest, *roachpb.QueryLocksRequest, *roachpb.IsSpanEmptyRequest:
// Accepted forward range requests.
foundForward = true

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"cmd_heartbeat_txn.go",
"cmd_increment.go",
"cmd_init_put.go",
"cmd_is_span_empty.go",
"cmd_lease.go",
"cmd_lease_info.go",
"cmd_lease_request.go",
Expand Down Expand Up @@ -107,6 +108,7 @@ go_test(
"cmd_end_transaction_test.go",
"cmd_export_test.go",
"cmd_get_test.go",
"cmd_is_span_empty_test.go",
"cmd_lease_test.go",
"cmd_query_intent_test.go",
"cmd_query_resolved_timestamp_test.go",
Expand Down
48 changes: 48 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_is_span_empty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

func init() {
RegisterReadOnlyCommand(roachpb.IsSpanEmpty, DefaultDeclareKeys, IsSpanEmpty)
}

// IsSpanEmpty determines whether there are any keys in the key span requested
// at any time. If there are any keys, the response header will have a NumKeys
// value of 1.
func IsSpanEmpty(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.IsSpanEmptyRequest)
reply := resp.(*roachpb.IsSpanEmptyResponse)
isEmpty, err := storage.MVCCIsSpanEmpty(ctx, reader, storage.MVCCIsSpanEmptyOptions{
StartKey: args.Key,
EndKey: args.EndKey,
StartTS: hlc.MinTimestamp, // beginning of time
EndTS: hlc.MaxTimestamp, // end of time
})
if err != nil {
return result.Result{}, errors.Wrap(err, "IsSpanEmpty")
}
if !isEmpty {
reply.NumKeys++
}
return result.Result{}, nil
}
97 changes: 97 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_is_span_empty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval_test

import (
"context"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestIsSpanEmpty(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
var sentIsSpanEmptyRequests int64
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
if _, exists := request.GetArg(roachpb.IsSpanEmpty); exists {
atomic.AddInt64(&sentIsSpanEmptyRequests, 1)
}
return nil
},
},
},
},
})
defer tc.Stopper().Stop(ctx)

kvDB := tc.Server(0).DB()
scratchKey := tc.ScratchRange(t)
mkKey := func(suffix string) roachpb.Key {
return append(scratchKey[:len(scratchKey):len(scratchKey)], suffix...)
}

checkIsEmpty := func(t *testing.T, exp bool, from, to roachpb.Key) {
var ba kv.Batch
ba.Header.MaxSpanRequestKeys = 1
ba.AddRawRequest(&roachpb.IsSpanEmptyRequest{
RequestHeader: roachpb.RequestHeader{
Key: from, EndKey: to,
},
})
require.NoError(t, kvDB.Run(ctx, &ba))
require.Equal(t, exp, ba.RawResponse().Responses[0].GetIsSpanEmpty().IsEmpty())
}

requireEmpty := func(t *testing.T, from, to roachpb.Key) {
checkIsEmpty(t, true, from, to)
}
requireNotEmpty := func(t *testing.T, from, to roachpb.Key) {
checkIsEmpty(t, false, from, to)
}

requireEmpty(t, mkKey(""), mkKey("").PrefixEnd())

tc.SplitRangeOrFatal(t, mkKey("c"))
requireEmpty(t, mkKey(""), mkKey("").PrefixEnd())

require.NoError(t, kvDB.Put(ctx, mkKey("x"), "foo"))
requireEmpty(t, mkKey(""), mkKey("x"))
requireNotEmpty(t, mkKey(""), mkKey("").PrefixEnd())

require.NoError(t, kvDB.Del(ctx, mkKey("x")))
requireEmpty(t, mkKey(""), mkKey("x"))
requireNotEmpty(t, mkKey(""), mkKey("").PrefixEnd())

// We want to make sure that the DistSender stops iterating ranges once
// the first range with any keys is found.
checkIsCalled := func(t *testing.T, expEmpty bool, delta int64, from, to roachpb.Key) {
before := atomic.LoadInt64(&sentIsSpanEmptyRequests)
checkIsEmpty(t, expEmpty, from, to)
require.Equal(t, delta, atomic.LoadInt64(&sentIsSpanEmptyRequests)-before)
}
checkIsCalled(t, false, 2, mkKey(""), mkKey("").PrefixEnd())
tc.SplitRangeOrFatal(t, mkKey("a"))
tc.SplitRangeOrFatal(t, mkKey("b"))
checkIsCalled(t, false, 4, mkKey(""), mkKey("").PrefixEnd())
}
35 changes: 35 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,25 @@ func (r *QueryLocksResponse) combine(c combinable) error {

var _ combinable = &QueryLocksResponse{}

// combine implements the combinable interface.
func (r *IsSpanEmptyResponse) combine(c combinable) error {
otherR := c.(*IsSpanEmptyResponse)
if r != nil {
if err := r.ResponseHeader.combine(otherR.Header()); err != nil {
return err
}
// Given the request doesn't actually count anything, and instead
// hijacks NumKeys to indicate whether there is any data, there's
// no good reason to have it take on a value greater than 1.
if r.ResponseHeader.NumKeys > 1 {
r.ResponseHeader.NumKeys = 1
}
}
return nil
}

var _ combinable = &IsSpanEmptyResponse{}

// Header implements the Request interface.
func (rh RequestHeader) Header() RequestHeader {
return rh
Expand Down Expand Up @@ -787,6 +806,9 @@ func (*ScanInterleavedIntentsRequest) Method() Method { return ScanInterleavedIn
// Method implements the Request interface.
func (*BarrierRequest) Method() Method { return Barrier }

// Method implements the Request interface.
func (*IsSpanEmptyRequest) Method() Method { return IsSpanEmpty }

// ShallowCopy implements the Request interface.
func (gr *GetRequest) ShallowCopy() Request {
shallowCopy := *gr
Expand Down Expand Up @@ -1075,6 +1097,12 @@ func (r *BarrierRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *IsSpanEmptyRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// NewGet returns a Request initialized to get the value at key. If
// forUpdate is true, an unreplicated, exclusive lock is acquired on on
// the key, if it exists.
Expand Down Expand Up @@ -1454,6 +1482,7 @@ func (*QueryResolvedTimestampRequest) flags() flag {
}
func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
// a parallel commit. See txn_interceptor_committer.go for a discussion about
Expand Down Expand Up @@ -1657,6 +1686,12 @@ func (r *JoinNodeResponse) CreateStoreIdent() (StoreIdent, error) {
return sIdent, nil
}

// IsEmpty returns true if the NumKeys field of the ResponseHeader is 0,
// indicating that the span is empty.
func (r *IsSpanEmptyResponse) IsEmpty() bool {
return r.NumKeys == 0
}

// SafeFormat implements redact.SafeFormatter.
func (c *ContentionEvent) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("conflicted with %s on %s for %.3fs", c.TxnMeta.ID, c.Key, c.Duration.Seconds())
Expand Down
21 changes: 21 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,25 @@ message ProbeResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// IsSpanEmptyRequest is used to determine whether a span contains any keys,
// garbage or otherwise. It is used to determine whether data deleted by a
// DeleteRange tombstone has been fully removed.
//
// Generally, the caller should set the MaxSpanKeys header on the BatchRequest
// to 1 so that the DistSender will process the overlapping ranges sequentially
// and stop after the first non-empty range.
message IsSpanEmptyRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// IsSpanEmptyResponse is the response to an IsSpanEmptyRequest.
// If there is any data in the queried span, the NumKeys field of the
// ResponseHeader will have a positive value; if NumKeys is zero, then the
// span is empty.
message IsSpanEmptyResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// A PutRequest is the argument to the Put() method.
message PutRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
Expand Down Expand Up @@ -2144,6 +2163,7 @@ message RequestUnion {
ScanInterleavedIntentsRequest scan_interleaved_intents = 52;
BarrierRequest barrier = 53;
ProbeRequest probe = 54;
IsSpanEmptyRequest is_span_empty = 56;
}
reserved 8, 15, 23, 25, 27, 31, 34;
}
Expand Down Expand Up @@ -2199,6 +2219,7 @@ message ResponseUnion {
ScanInterleavedIntentsResponse scan_interleaved_intents = 52;
BarrierResponse barrier = 53;
ProbeResponse probe = 54;
IsSpanEmptyResponse is_span_empty = 56;
}
reserved 8, 15, 23, 25, 27, 28, 31, 34;
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/roachpb/batch_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/roachpb/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ const (
// Probe is a noop write request used to test the ability to make
// progress at the replication layer.
Probe
// IsSpanEmpty is a non-transaction read request used to determine whether
// a span contains any keys whatsoever (garbage or otherwise).
IsSpanEmpty
// NumMethods represents the total number of API methods.
NumMethods
)
Loading