Skip to content

Commit

Permalink
kv: include locking strength and durability in Get/Scan/RevScan SafeF…
Browse files Browse the repository at this point in the history
…ormat

The goal of this pr is improving observability. We are adding lock strength
and lock durability with Get/Scan/RevScan request if the request is locking.
This is implemented by introducing an optional extension interface to Request
Interface called SafeFormatterRequest. We are also refactoring inside
BatchRequest.SafeFormat with the added interface. Please note the subtle
changed introduced here: if the EndKey is not present we print only Key with
square brackets, and this applies to all the request types.

Fixes: #114475

Release note: None.
  • Loading branch information
Eric.Yang committed Nov 23, 2023
1 parent 1473312 commit 20e256c
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Check the index as well.
exec-privileged-op-tenant
ALTER INDEX t@idx SPLIT AT VALUES (1)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Grant the capability without providing an explicit value.
update-capabilities
Expand All @@ -64,7 +64,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Lastly, use the explicitly set to true syntax.
update-capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]

update-capabilities
ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true
Expand All @@ -31,4 +31,4 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -4530,12 +4531,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = roachpb.Key("a")
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
{
exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` +
exp := `have been waiting 8.16s (120 attempts) for RPC Get[lockStrength=Shared,lockDurability=Unreplicated] [‹"a"›] to` +
` r9:‹{x-z}› [<no replicas>, next=0, gen=0]; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br)
Expand Down
80 changes: 80 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,86 @@ type Request interface {
flags() flag
}

// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting.
type SafeFormatterRequest interface {
Request
redact.SafeFormatter
}

var _ SafeFormatterRequest = (*GetRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (gr GetRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(gr.Method())
if gr.KeyLockingStrength == lock.None {
return
}
s.Printf("[lockStrength=%s,lockDurability=%s]", gr.KeyLockingStrength, gr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (sr ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(sr.Method())
if sr.KeyLockingStrength == lock.None {
return
}
s.Printf("[lockStrength=%s,lockDurability=%s]", sr.KeyLockingStrength, sr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ReverseScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rsr ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(rsr.Method())
if rsr.KeyLockingStrength == lock.None {
return
}
s.Printf("[lockStrength=%s,lockDurability=%s]", rsr.KeyLockingStrength, rsr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*EndTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (etr EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(", etr.Method())
if etr.Commit {
if etr.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if etr.InternalCommitTrigger != nil {
s.Printf(" %s", etr.InternalCommitTrigger.Kind())
}
s.Print(")")
}

var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rtr RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short())
if rtr.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Print(")")
}

var _ SafeFormatterRequest = (*PushTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (ptr PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("PushTxn(%s,%s->%s)",
ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short())
}

// LockingReadRequest is an interface used to expose the key-level locking
// strength of a read-only request.
type LockingReadRequest interface {
Expand Down
43 changes: 9 additions & 34 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {

// SafeFormat implements redact.SafeFormatter.
// It gives a brief summary of the contained requests and keys in the batch.
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) {
for count, arg := range ba.Requests {
// Limit the strings to provide just a summary. Without this limit
// a log message with a BatchRequest can be very long.
Expand All @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
}

req := arg.GetInner()
if et, ok := req.(*EndTxnRequest); ok {
h := req.Header()
s.Printf("%s(", req.Method())
if et.Commit {
if et.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if et.InternalCommitTrigger != nil {
s.Printf(" %s", et.InternalCommitTrigger.Kind())
}
s.Printf(") [%s]", h.Key)
} else if rt, ok := req.(*RecoverTxnRequest); ok {
h := req.Header()
s.Printf("%s(%s, ", req.Method(), rt.Txn.Short())
if rt.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(") [%s]", h.Key)
if safeFormatterReq, ok := req.(SafeFormatterRequest); ok {
safeFormatterReq.SafeFormat(s, verb)
} else {
h := req.Header()
if req.Method() == PushTxn {
pushReq := req.(*PushTxnRequest)
s.Printf("PushTxn(%s,%s->%s)",
pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short())
} else {
s.Print(req.Method())
}
s.Print(req.Method())
}
h := req.Header()
if len(h.EndKey) > 0 {
s.Printf(" [%s,%s)", h.Key, h.EndKey)
} else {
s.Printf(" [%s]", h.Key)
}
}
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestBatchRequestString(t *testing.T) {
ba.Requests = append(ba.Requests, ru)

{
exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
act := ba.String()
require.Equal(t, exp, act)
}
Expand Down

0 comments on commit 20e256c

Please sign in to comment.