From 9110c850945fd88efdfc0fb7cb54ffef12fa7840 Mon Sep 17 00:00:00 2001 From: "Eric.Yang" Date: Wed, 15 Nov 2023 07:19:47 +0000 Subject: [PATCH 1/2] kv: include locking strength and durability in Get/Scan/RevScan SafeFormat The goal of this pr is improving observability. We are adding lock strength and lock durability with Get/Scan/RevScan request if the request is locking. This is implemented by introducing an optional extension interface to Request Interface called SafeFormatterRequest. We are also refactoring inside BatchRequest.SafeFormat with the added interface. Please note the subtle changed introduced here: if the EndKey is not present we print only Key with square brackets, and this applies to all the request types. Fixes: #114475 Release note: None. --- .../testdata/can_admin_split | 6 +- .../testdata/can_admin_unsplit | 4 +- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 5 +- pkg/kv/kvpb/api.go | 80 ++++++++++++++++ pkg/kv/kvpb/batch.go | 43 ++------- pkg/kv/kvpb/string_test.go | 95 ++++++++++++++++++- 6 files changed, 192 insertions(+), 41 deletions(-) diff --git a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split index cc24f91644c9..d837c1a7e9ad 100644 --- a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split +++ b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_split @@ -34,13 +34,13 @@ ok exec-privileged-op-tenant ALTER TABLE t SPLIT AT VALUES (0) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Check the index as well. exec-privileged-op-tenant ALTER INDEX t@idx SPLIT AT VALUES (1) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Grant the capability without providing an explicit value. update-capabilities @@ -64,7 +64,7 @@ ok exec-privileged-op-tenant ALTER TABLE t SPLIT AT VALUES (0) ---- -pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) +pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest) # Lastly, use the explicitly set to true syntax. update-capabilities diff --git a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit index 8426c9889fc9..1ff287edd0e2 100644 --- a/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit +++ b/pkg/ccl/multitenantccl/tenantcapabilitiesccl/testdata/can_admin_unsplit @@ -11,7 +11,7 @@ ok exec-privileged-op-tenant ALTER TABLE t UNSPLIT AT VALUES (0) ---- -pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] +pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] update-capabilities ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true @@ -31,4 +31,4 @@ ok exec-privileged-op-tenant ALTER TABLE t UNSPLIT AT VALUES (0) ---- -pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] +pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated] diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 249e43dbcc12..e7c36779736f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -4531,12 +4532,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) { ba := &kvpb.BatchRequest{} get := &kvpb.GetRequest{} get.Key = roachpb.Key("a") + get.KeyLockingStrength = lock.Shared + get.KeyLockingDurability = lock.Unreplicated ba.Add(get) br := &kvpb.BatchResponse{} br.Error = kvpb.NewError(errors.New("boom")) desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")} { - exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` + + exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` + ` r9:‹{x-z}› [, next=0, gen=0]; resp: ‹(err: boom)›` var s redact.StringBuilder slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 0bc347224df6..1b5cb4c12807 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -194,6 +194,86 @@ type Request interface { flags() flag } +// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting. +type SafeFormatterRequest interface { + Request + redact.SafeFormatter +} + +var _ SafeFormatterRequest = (*GetRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (gr *GetRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(gr.Method()) + if gr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", gr.KeyLockingStrength, gr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*ScanRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (sr *ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(sr.Method()) + if sr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", sr.KeyLockingStrength, sr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*ReverseScanRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (rsr *ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Print(rsr.Method()) + if rsr.KeyLockingStrength == lock.None { + return + } + s.Printf("(%s,%s)", rsr.KeyLockingStrength, rsr.KeyLockingDurability) +} + +var _ SafeFormatterRequest = (*EndTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (etr *EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("%s(", etr.Method()) + if etr.Commit { + if etr.IsParallelCommit() { + s.Printf("parallel commit") + } else { + s.Printf("commit") + } + } else { + s.Printf("abort") + } + if etr.InternalCommitTrigger != nil { + s.Printf(" %s", etr.InternalCommitTrigger.Kind()) + } + s.Printf(")") +} + +var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (rtr *RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short()) + if rtr.ImplicitlyCommitted { + s.Printf("commit") + } else { + s.Printf("abort") + } + s.Printf(")") +} + +var _ SafeFormatterRequest = (*PushTxnRequest)(nil) + +// SafeFormat implements the redact.SafeFormatter interface. +func (ptr *PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) { + s.Printf("PushTxn(%s,%s->%s)", + ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short()) +} + // LockingReadRequest is an interface used to expose the key-level locking // strength of a read-only request. type LockingReadRequest interface { diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index 4f57063274d6..c54da8265ab2 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion { // SafeFormat implements redact.SafeFormatter. // It gives a brief summary of the contained requests and keys in the batch. -func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { +func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) { for count, arg := range ba.Requests { // Limit the strings to provide just a summary. Without this limit // a log message with a BatchRequest can be very long. @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { } req := arg.GetInner() - if et, ok := req.(*EndTxnRequest); ok { - h := req.Header() - s.Printf("%s(", req.Method()) - if et.Commit { - if et.IsParallelCommit() { - s.Printf("parallel commit") - } else { - s.Printf("commit") - } - } else { - s.Printf("abort") - } - if et.InternalCommitTrigger != nil { - s.Printf(" %s", et.InternalCommitTrigger.Kind()) - } - s.Printf(") [%s]", h.Key) - } else if rt, ok := req.(*RecoverTxnRequest); ok { - h := req.Header() - s.Printf("%s(%s, ", req.Method(), rt.Txn.Short()) - if rt.ImplicitlyCommitted { - s.Printf("commit") - } else { - s.Printf("abort") - } - s.Printf(") [%s]", h.Key) + if safeFormatterReq, ok := req.(SafeFormatterRequest); ok { + safeFormatterReq.SafeFormat(s, verb) } else { - h := req.Header() - if req.Method() == PushTxn { - pushReq := req.(*PushTxnRequest) - s.Printf("PushTxn(%s,%s->%s)", - pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short()) - } else { - s.Print(req.Method()) - } + s.Print(req.Method()) + } + h := req.Header() + if len(h.EndKey) > 0 { s.Printf(" [%s,%s)", h.Key, h.EndKey) + } else { + s.Printf(" [%s]", h.Key) } } { diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index d4c8a73d3947..b40df06ebc05 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -63,7 +63,7 @@ func TestBatchRequestString(t *testing.T) { ba.Requests = append(ba.Requests, ru) { - exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` + exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` act := ba.String() require.Equal(t, exp, act) } @@ -98,3 +98,96 @@ func TestAmbiguousResultError(t *testing.T) { s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err)) echotest.Require(t, s, filepath.Join("testdata", "ambiguous_result_error.txt")) } + +// Unit test the requests that implemented SafeFormatterRequest interface. +func TestRequestSafeFormat(t *testing.T) { + txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0) + fixedUuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425") + txn.ID = fixedUuid + pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) + pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) + fixedUuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425") + fixedUuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425") + pusherTxn.ID = fixedUuid2 + pusheeTxn.ID = fixedUuid3 + + testCases := []struct { + req kvpb.Request + redactable string + redacted string + }{ + { + req: &kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("a"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + }, + redactable: "Get(Shared,Unreplicated)", + redacted: "Get(Shared,Unreplicated)", + }, + { + req: &kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("b"), + EndKey: roachpb.Key("d"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + }, + redactable: "Scan(Shared,Unreplicated)", + redacted: "Scan(Shared,Unreplicated)", + }, + { + req: &kvpb.ReverseScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("c"), + EndKey: roachpb.Key("d"), + }, + KeyLockingStrength: lock.Shared, + KeyLockingDurability: lock.Unreplicated, + }, + redactable: "ReverseScan(Shared,Unreplicated)", + redacted: "ReverseScan(Shared,Unreplicated)", + }, + { + req: &kvpb.EndTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("ab"), + }, + Commit: true, + }, + redactable: "EndTxn(commit)", + redacted: "EndTxn(commit)", + }, + { + req: &kvpb.RecoverTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("abc"), + }, + Txn: txn.TxnMeta, + ImplicitlyCommitted: false, + }, + redactable: "RecoverTxn(00fbff57, abort)", + redacted: "RecoverTxn(00fbff57, abort)", + }, + { + req: &kvpb.PushTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("123"), + }, + PusherTxn: pusherTxn, + PusheeTxn: pusheeTxn.TxnMeta, + }, + redactable: "PushTxn(‹PUSH_TIMESTAMP›,00fbff58->00fbff59)", + redacted: "PushTxn(‹×›,00fbff58->00fbff59)", + }, + } + for _, c := range testCases { + t.Run(c.req.Method().String(), func(t *testing.T) { + require.EqualValues(t, c.redactable, redact.Sprint(c.req)) + require.EqualValues(t, c.redacted, redact.Sprint(c.req).Redact()) + }) + } +} From 7e830da5395e3bfdaee33643bf2c99b7e7fbd80f Mon Sep 17 00:00:00 2001 From: lyang24 Date: Tue, 5 Dec 2023 23:22:27 +0000 Subject: [PATCH 2/2] kv: implement redact.SafeValue for PushTxnType This commit implements redact.SafeValue for PushTxnType with the goal of displaying the PushTxnType on PushTxnRequest with the redacted format. Fixes: #114475 Release note: None. --- pkg/kv/kvpb/api.go | 3 +++ pkg/kv/kvpb/string_test.go | 10 +++++----- pkg/testutils/lint/passes/redactcheck/redactcheck.go | 1 + 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 1b5cb4c12807..85f91a2be0f5 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -2153,3 +2153,6 @@ type RangeFeedEventProducer interface { // range needs to be restarted. Recv() (*RangeFeedEvent, error) } + +// SafeValue implements the redact.SafeValue interface. +func (PushTxnType) SafeValue() {} diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index b40df06ebc05..76367eeee3b7 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -101,11 +101,11 @@ func TestAmbiguousResultError(t *testing.T) { // Unit test the requests that implemented SafeFormatterRequest interface. func TestRequestSafeFormat(t *testing.T) { - txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0) + txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0, false) fixedUuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425") txn.ID = fixedUuid - pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) - pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0) + pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false) + pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false) fixedUuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425") fixedUuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425") pusherTxn.ID = fixedUuid2 @@ -180,8 +180,8 @@ func TestRequestSafeFormat(t *testing.T) { PusherTxn: pusherTxn, PusheeTxn: pusheeTxn.TxnMeta, }, - redactable: "PushTxn(‹PUSH_TIMESTAMP›,00fbff58->00fbff59)", - redacted: "PushTxn(‹×›,00fbff58->00fbff59)", + redactable: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)", + redacted: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)", }, } for _, c := range testCases { diff --git a/pkg/testutils/lint/passes/redactcheck/redactcheck.go b/pkg/testutils/lint/passes/redactcheck/redactcheck.go index cf0e8a76c59e..122ff6b362af 100644 --- a/pkg/testutils/lint/passes/redactcheck/redactcheck.go +++ b/pkg/testutils/lint/passes/redactcheck/redactcheck.go @@ -87,6 +87,7 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) { "LeaseAppliedIndex": {}, "RaftIndex": {}, "RaftTerm": {}, + "PushTxnType": {}, }, "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": { "SeqNum": {},