Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
114481: kv: include locking strength and durability in Get/Scan/RevScan SafeFormat r=nvanbenschoten a=lyang24

kv: include locking strength and durability in Get/Scan/RevScan SafeFormat

The goal of this pr is improving observability. We are adding lock strength
and lock durability with Get/Scan/RevScan request if the request is locking.
This is implemented by introducing an optional extension interface to Request
Interface called SafeFormatterRequest. We are also refactoring inside
BatchRequest.SafeFormat with the added interface. Please note the subtle
changed introduced here: if the EndKey is not present we print only Key with
square brackets, and this applies to all the request types.

Fixes: #114475
Release note: None.

115946: kvserver: don't covert ctx errors to ReplicaCorruptionError r=kvoli a=dt

Release note: none.
Epic: none.

Fixes #114442.

116095: backupccl: flush at end of online restore span entry r=dt a=msbutler

This patch adds a flush call at the end of processing a restoreSpanEntry during
the linking phase. Previously, we'd flush the leftover data at the beginning of
processing the next restore span entry. But in a multi worker world, in which
workers grab restore span entries off a single channel, this "leftover" flush
could link data to a nonempty range that another worker was linking data into,
causing that range to overfill and splits to retry.

Epic: none

Release note: none

Co-authored-by: Eric.Yang <[email protected]>
Co-authored-by: lyang24 <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
5 people committed Dec 12, 2023
4 parents c101994 + 17eaea1 + 16bdefc + c954284 commit a15eeba
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 46 deletions.
10 changes: 9 additions & 1 deletion pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func sendAddRemoteSSTWorker(
// span rather than one we have added to, since we add with estimated
// stats and splitting a span with estimated stats is slow.
if batchSize > targetBatchSize {
log.Infof(ctx, "flushing %s batch of %d SSTs due to size limit", sz(batchSize), len(toAdd))
if err := flush(file.BackupFileEntrySpan.Key); err != nil {
return err
}
Expand All @@ -203,9 +204,16 @@ func sendAddRemoteSSTWorker(
toAdd = append(toAdd, file)
batchSize += file.BackupFileEntryCounts.DataSize
}
// TODO(msbutler): think hard about if this restore span entry is a safe
// key to split on. Note that it only is safe with
// https://github.com/cockroachdb/cockroach/pull/114464
log.Infof(ctx, "flushing %s batch of %d SSTs at end of restore span entry", sz(batchSize), len(toAdd))
if err := flush(entry.Span.EndKey); err != nil {
return err
}
requestFinishedCh <- struct{}{}
}
return flush(nil)
return nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_span_covering.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var targetOnlineRestoreSpanSize = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"backup.restore_span.online_target_size",
"target size to which base spans of an online restore are merged to produce a restore span (0 disables)",
8<<30,
16<<30,
)

// backupManifestFileIterator exposes methods that can be used to iterate over
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Check the index as well.
exec-privileged-op-tenant
ALTER INDEX t@idx SPLIT AT VALUES (1)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Grant the capability without providing an explicit value.
update-capabilities
Expand All @@ -64,7 +64,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Lastly, use the explicitly set to true syntax.
update-capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]

update-capabilities
ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true
Expand All @@ -31,4 +31,4 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -4527,12 +4528,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = roachpb.Key("a")
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
{
exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` +
exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
` r9:‹{x-z}› [<no replicas>, next=0, gen=0]; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br)
Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,86 @@ type Request interface {
flags() flag
}

// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting.
type SafeFormatterRequest interface {
Request
redact.SafeFormatter
}

var _ SafeFormatterRequest = (*GetRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (gr *GetRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(gr.Method())
if gr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", gr.KeyLockingStrength, gr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (sr *ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(sr.Method())
if sr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", sr.KeyLockingStrength, sr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ReverseScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rsr *ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(rsr.Method())
if rsr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", rsr.KeyLockingStrength, rsr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*EndTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (etr *EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(", etr.Method())
if etr.Commit {
if etr.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if etr.InternalCommitTrigger != nil {
s.Printf(" %s", etr.InternalCommitTrigger.Kind())
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rtr *RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short())
if rtr.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*PushTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (ptr *PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("PushTxn(%s,%s->%s)",
ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short())
}

// LockingReadRequest is an interface used to expose the key-level locking
// strength of a read-only request.
type LockingReadRequest interface {
Expand Down Expand Up @@ -2072,3 +2152,6 @@ type RangeFeedEventProducer interface {
// range needs to be restarted.
Recv() (*RangeFeedEvent, error)
}

// SafeValue implements the redact.SafeValue interface.
func (PushTxnType) SafeValue() {}
43 changes: 9 additions & 34 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {

// SafeFormat implements redact.SafeFormatter.
// It gives a brief summary of the contained requests and keys in the batch.
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) {
for count, arg := range ba.Requests {
// Limit the strings to provide just a summary. Without this limit
// a log message with a BatchRequest can be very long.
Expand All @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
}

req := arg.GetInner()
if et, ok := req.(*EndTxnRequest); ok {
h := req.Header()
s.Printf("%s(", req.Method())
if et.Commit {
if et.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if et.InternalCommitTrigger != nil {
s.Printf(" %s", et.InternalCommitTrigger.Kind())
}
s.Printf(") [%s]", h.Key)
} else if rt, ok := req.(*RecoverTxnRequest); ok {
h := req.Header()
s.Printf("%s(%s, ", req.Method(), rt.Txn.Short())
if rt.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(") [%s]", h.Key)
if safeFormatterReq, ok := req.(SafeFormatterRequest); ok {
safeFormatterReq.SafeFormat(s, verb)
} else {
h := req.Header()
if req.Method() == PushTxn {
pushReq := req.(*PushTxnRequest)
s.Printf("PushTxn(%s,%s->%s)",
pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short())
} else {
s.Print(req.Method())
}
s.Print(req.Method())
}
h := req.Header()
if len(h.EndKey) > 0 {
s.Printf(" [%s,%s)", h.Key, h.EndKey)
} else {
s.Printf(" [%s]", h.Key)
}
}
{
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,10 +1214,27 @@ var _ ErrorDetailInterface = &RaftGroupDeletedError{}

// NewReplicaCorruptionError creates a new error indicating a corrupt replica.
// The supplied error is used to provide additional detail in the error message.
// NB: Take caution when marking errors as replica corruption errors to be sure
// that they are actually indicative of replica corruption and should be treated
// as such; for example while in general a failures to apply a command might be,
// a timeout or context cancellation error may not be, especially if a user
// request controls that cancellation/timeout. See the helper below in
// MaybeWrapReplicaCorruptionError.
func NewReplicaCorruptionError(err error) *ReplicaCorruptionError {
return &ReplicaCorruptionError{ErrorMsg: err.Error()}
}

// MaybeWrapReplicaCorruptionError wraps a passed error as a replica corruption
// error unless it matches the error in the passed context, which would suggest
// the whole operation was cancelled due to the latter rather than indicating a
// fault which implies replica corruption.
func MaybeWrapReplicaCorruptionError(ctx context.Context, err error) error {
if errors.Is(err, ctx.Err()) {
return err
}
return NewReplicaCorruptionError(err)
}

func (e *ReplicaCorruptionError) Error() string {
return redact.Sprint(e).StripMarkers()
}
Expand Down
95 changes: 94 additions & 1 deletion pkg/kv/kvpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatchRequestString(t *testing.T) {
ba.Requests = append(ba.Requests, ru)

{
exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
act := ba.String()
require.Equal(t, exp, act)
}
Expand Down Expand Up @@ -98,3 +98,96 @@ func TestAmbiguousResultError(t *testing.T) {
s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err))
echotest.Require(t, s, filepath.Join("testdata", "ambiguous_result_error.txt"))
}

// Unit test the requests that implemented SafeFormatterRequest interface.
func TestRequestSafeFormat(t *testing.T) {
txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0, false)
fixedUuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425")
txn.ID = fixedUuid
pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
fixedUuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425")
fixedUuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425")
pusherTxn.ID = fixedUuid2
pusheeTxn.ID = fixedUuid3

testCases := []struct {
req kvpb.Request
redactable string
redacted string
}{
{
req: &kvpb.GetRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("a"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Get(Shared,Unreplicated)",
redacted: "Get(Shared,Unreplicated)",
},
{
req: &kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("b"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Scan(Shared,Unreplicated)",
redacted: "Scan(Shared,Unreplicated)",
},
{
req: &kvpb.ReverseScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("c"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "ReverseScan(Shared,Unreplicated)",
redacted: "ReverseScan(Shared,Unreplicated)",
},
{
req: &kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("ab"),
},
Commit: true,
},
redactable: "EndTxn(commit)",
redacted: "EndTxn(commit)",
},
{
req: &kvpb.RecoverTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("abc"),
},
Txn: txn.TxnMeta,
ImplicitlyCommitted: false,
},
redactable: "RecoverTxn(00fbff57, abort)",
redacted: "RecoverTxn(00fbff57, abort)",
},
{
req: &kvpb.PushTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("123"),
},
PusherTxn: pusherTxn,
PusheeTxn: pusheeTxn.TxnMeta,
},
redactable: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
redacted: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
},
}
for _, c := range testCases {
t.Run(c.req.Method().String(), func(t *testing.T) {
require.EqualValues(t, c.redactable, redact.Sprint(c.req))
require.EqualValues(t, c.redacted, redact.Sprint(c.req).Redact())
})
}
}
Loading

0 comments on commit a15eeba

Please sign in to comment.