From f6c57c68eee6d294e7384cf3d5de9bbf13ee8dad Mon Sep 17 00:00:00 2001 From: "Eric.Yang" Date: Tue, 14 Nov 2023 23:19:47 -0800 Subject: [PATCH] kv: include locking strength and durability in Get/Scan/RevScan SafeFormat The goal of this pr is improving observability. We are adding lock strength and lock durability with Get/Scan/RevScan request if the request is locking. This is implemented by introducing an optional extension interface to Request Interface called SafeFormatterRequest. We are also refactoring inside BatchRequest.SafeFormat with the added interface. Please note the subtle changed introduced here: if the EndKey is not present we print only Key with square brackets, and this applies to all the request types. Fixes: #114475 Release note: None. --- .../testdata/can_admin_split | 6 +- .../testdata/can_admin_unsplit | 4 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 5 +- pkg/kv/kvpb/api.go | 80 +++++++++++++++++++ pkg/kv/kvpb/batch.go | 43 +++------- pkg/kv/kvpb/string_test.go | 75 ++++++++++++++++- 6 files changed, 172 insertions(+), 41 deletions(-) diff --git a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split index cc24f91644c9..d837c1a7e9ad 100644 --- a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split +++ b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split @@ -34,13 +34,13 @@ ok exec-privileged-op-tenant ALTER TABLE t SPLIT AT VALUES (0) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Check the index as well. exec-privileged-op-tenant ALTER INDEX t@idx SPLIT AT VALUES (1) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Grant the capability without providing an explicit value. update-capabilities @@ -64,7 +64,7 @@ ok exec-privileged-op-tenant ALTER TABLE t SPLIT AT VALUES (0) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Lastly, use the explicitly set to true syntax. update-capabilities diff --git a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit index 8426c9889fc9..1ff287edd0e2 100644 --- a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit +++ b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit @@ -11,7 +11,7 @@ ok exec-privileged-op-tenant ALTER TABLE t UNSPLIT AT VALUES (0) ---- -pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] +pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] update-capabilities ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true @@ -31,4 +31,4 @@ ok exec-privileged-op-tenant ALTER TABLE t UNSPLIT AT VALUES (0) ---- -pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] +pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 07a9a5acacb6..1401f0e42826 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -4530,12 +4531,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) { ba := &kvpb.BatchRequest{} get := &kvpb.GetRequest{} get.Key = roachpb.Key("a") + get.KeyLockingStrength = lock.Shared + get.KeyLockingDurability = lock.Unreplicated ba.Add(get) br := &kvpb.BatchResponse{} br.Error = kvpb.NewError(errors.New("boom")) desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")} { - exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` + + exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` + ` r9:‹{x-z}› [, next=0, gen=0]; resp: ‹(err: boom)›` var s redact.StringBuilder slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 2fd1b9cf1060..296935b6ba1c 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -194,6 +194,86 @@ type Request interface { flags() flag } +// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting. +type SafeFormatterRequest interface { + Request + redact.SafeFormatter +} + +var _ SafeFormatterRequest = (*GetRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (gr *GetRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(gr.Method()) + if gr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", gr.KeyLockingStrength, gr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*ScanRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (sr *ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(sr.Method()) + if sr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", sr.KeyLockingStrength, sr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*ReverseScanRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (rsr *ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(rsr.Method()) + if rsr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", rsr.KeyLockingStrength, rsr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*EndTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (etr *EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("%s(", etr.Method()) + if etr.Commit { + if etr.IsParallelCommit() { + s.Printf("parallel commit") + } else { + s.Printf("commit") + } + } else { + s.Printf("abort") + } + if etr.InternalCommitTrigger != nil { + s.Printf(" %s", etr.InternalCommitTrigger.Kind()) + } + s.Printf(")") +} + +var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (rtr *RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short()) + if rtr.ImplicitlyCommitted { + s.Printf("commit") + } else { + s.Printf("abort") + } + s.Printf(")") +} + +var _ SafeFormatterRequest = (*PushTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (ptr *PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("PushTxn(%s,%s->%s)", + ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short()) +} + // LockingReadRequest is an interface used to expose the key-level locking // strength of a read-only request. type LockingReadRequest interface { diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index 4f57063274d6..c54da8265ab2 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion { // SafeFormat implements redact.SafeFormatter. // It gives a brief summary of the contained requests and keys in the batch. -func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { +func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) { for count, arg := range ba.Requests { // Limit the strings to provide just a summary. Without this limit // a log message with a BatchRequest can be very long. @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { } req := arg.GetInner() - if et, ok := req.(*EndTxnRequest); ok { - h := req.Header() - s.Printf("%s(", req.Method()) - if et.Commit { - if et.IsParallelCommit() { - s.Printf("parallel commit") - } else { - s.Printf("commit") - } - } else { - s.Printf("abort") - } - if et.InternalCommitTrigger != nil { - s.Printf(" %s", et.InternalCommitTrigger.Kind()) - } - s.Printf(") [%s]", h.Key) - } else if rt, ok := req.(*RecoverTxnRequest); ok { - h := req.Header() - s.Printf("%s(%s, ", req.Method(), rt.Txn.Short()) - if rt.ImplicitlyCommitted { - s.Printf("commit") - } else { - s.Printf("abort") - } - s.Printf(") [%s]", h.Key) + if safeFormatterReq, ok := req.(SafeFormatterRequest); ok { + safeFormatterReq.SafeFormat(s, verb) } else { - h := req.Header() - if req.Method() == PushTxn { - pushReq := req.(*PushTxnRequest) - s.Printf("PushTxn(%s,%s->%s)", - pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short()) - } else { - s.Print(req.Method()) - } + s.Print(req.Method()) + } + h := req.Header() + if len(h.EndKey) > 0 { s.Printf(" [%s,%s)", h.Key, h.EndKey) + } else { + s.Printf(" [%s]", h.Key) } } { diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index 4aff87774371..e7da46e74476 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -62,7 +62,7 @@ func TestBatchRequestString(t *testing.T) { ba.Requests = append(ba.Requests, ru) { - exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` + exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` act := ba.String() require.Equal(t, exp, act) } @@ -97,3 +97,76 @@ func TestAmbiguousResultError(t *testing.T) { s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err)) echotest.Require(t, s, filepath.Join("testdata", "ambiguous_result_error.txt")) } + +// Unit test the requests that implemented SafeFormatterRequest interface. +func TestRequestSafeFormat(t *testing.T) { + gr := &kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("a"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + } + require.EqualValues(t, "Get(Shared,Unreplicated)", redact.Sprint(gr)) + require.EqualValues(t, "Get(Shared,Unreplicated)", redact.Sprint(gr).Redact()) + + sr := &kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("b"), + EndKey: roachpb.Key("d"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + } + require.EqualValues(t, "Scan(Shared,Unreplicated)", redact.Sprint(sr)) + require.EqualValues(t, "Scan(Shared,Unreplicated)", redact.Sprint(sr).Redact()) + + rsr := &kvpb.ReverseScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("c"), + EndKey: roachpb.Key("d"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + } + require.EqualValues(t, "ReverseScan(Shared,Unreplicated)", redact.Sprint(rsr)) + require.EqualValues(t, "ReverseScan(Shared,Unreplicated)", redact.Sprint(rsr).Redact()) + + etr := &kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("ab"), + }, + Commit: true, + } + require.EqualValues(t, "EndTxn(commit)", redact.Sprint(etr)) + require.EqualValues(t, "EndTxn(commit)", redact.Sprint(etr).Redact()) + + txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0) + fixed_uuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425") + txn.ID = fixed_uuid + rtr := &kvpb.RecoverTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("abc"), + }, + Txn: txn.TxnMeta, + ImplicitlyCommitted: false, + } + require.EqualValues(t, "RecoverTxn(00fbff57, abort)", redact.Sprint(rtr)) + require.EqualValues(t, "RecoverTxn(00fbff57, abort)", redact.Sprint(rtr).Redact()) + + pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) + fixed_uuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425") + pusherTxn.ID = fixed_uuid2 + pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) + fixed_uuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425") + pusheeTxn.ID = fixed_uuid3 + ptr := &kvpb.PushTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("123"), + }, + PusherTxn: pusherTxn, + PusheeTxn: pusheeTxn.TxnMeta, + } + require.EqualValues(t, "PushTxn(‹PUSH_TIMESTAMP›,00fbff58->00fbff59)", redact.Sprint(ptr)) + require.EqualValues(t, "PushTxn(‹×›,00fbff58->00fbff59)", redact.Sprint(ptr).Redact()) +}