Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: include locking strength and durability in Get/Scan/RevScan SafeFormat #114481

Merged
merged 2 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Check the index as well.
exec-privileged-op-tenant
ALTER INDEX t@idx SPLIT AT VALUES (1)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Grant the capability without providing an explicit value.
update-capabilities
Expand All @@ -64,7 +64,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Lastly, use the explicitly set to true syntax.
update-capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]

update-capabilities
ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true
Expand All @@ -31,4 +31,4 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -4527,12 +4528,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = roachpb.Key("a")
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
{
exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` +
exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
` r9:‹{x-z}› [<no replicas>, next=0, gen=0]; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br)
Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,86 @@ type Request interface {
flags() flag
}

// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting.
type SafeFormatterRequest interface {
Request
redact.SafeFormatter
}

var _ SafeFormatterRequest = (*GetRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (gr *GetRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(gr.Method())
if gr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", gr.KeyLockingStrength, gr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (sr *ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(sr.Method())
if sr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", sr.KeyLockingStrength, sr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ReverseScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rsr *ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(rsr.Method())
if rsr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", rsr.KeyLockingStrength, rsr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*EndTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (etr *EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(", etr.Method())
if etr.Commit {
if etr.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if etr.InternalCommitTrigger != nil {
s.Printf(" %s", etr.InternalCommitTrigger.Kind())
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rtr *RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short())
if rtr.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*PushTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (ptr *PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("PushTxn(%s,%s->%s)",
ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short())
}

// LockingReadRequest is an interface used to expose the key-level locking
// strength of a read-only request.
type LockingReadRequest interface {
Expand Down Expand Up @@ -2072,3 +2152,6 @@ type RangeFeedEventProducer interface {
// range needs to be restarted.
Recv() (*RangeFeedEvent, error)
}

// SafeValue implements the redact.SafeValue interface.
func (PushTxnType) SafeValue() {}
43 changes: 9 additions & 34 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {

// SafeFormat implements redact.SafeFormatter.
// It gives a brief summary of the contained requests and keys in the batch.
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) {
for count, arg := range ba.Requests {
// Limit the strings to provide just a summary. Without this limit
// a log message with a BatchRequest can be very long.
Expand All @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
}

req := arg.GetInner()
if et, ok := req.(*EndTxnRequest); ok {
h := req.Header()
s.Printf("%s(", req.Method())
if et.Commit {
if et.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if et.InternalCommitTrigger != nil {
s.Printf(" %s", et.InternalCommitTrigger.Kind())
}
s.Printf(") [%s]", h.Key)
} else if rt, ok := req.(*RecoverTxnRequest); ok {
h := req.Header()
s.Printf("%s(%s, ", req.Method(), rt.Txn.Short())
if rt.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(") [%s]", h.Key)
if safeFormatterReq, ok := req.(SafeFormatterRequest); ok {
safeFormatterReq.SafeFormat(s, verb)
} else {
h := req.Header()
if req.Method() == PushTxn {
pushReq := req.(*PushTxnRequest)
s.Printf("PushTxn(%s,%s->%s)",
pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short())
} else {
s.Print(req.Method())
}
s.Print(req.Method())
}
h := req.Header()
if len(h.EndKey) > 0 {
s.Printf(" [%s,%s)", h.Key, h.EndKey)
} else {
s.Printf(" [%s]", h.Key)
}
}
{
Expand Down
95 changes: 94 additions & 1 deletion pkg/kv/kvpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatchRequestString(t *testing.T) {
ba.Requests = append(ba.Requests, ru)

{
exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
act := ba.String()
require.Equal(t, exp, act)
}
Expand Down Expand Up @@ -98,3 +98,96 @@ func TestAmbiguousResultError(t *testing.T) {
s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err))
echotest.Require(t, s, filepath.Join("testdata", "ambiguous_result_error.txt"))
}

// Unit test the requests that implemented SafeFormatterRequest interface.
func TestRequestSafeFormat(t *testing.T) {
txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0, false)
fixedUuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425")
txn.ID = fixedUuid
pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
fixedUuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425")
fixedUuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425")
pusherTxn.ID = fixedUuid2
pusheeTxn.ID = fixedUuid3

testCases := []struct {
req kvpb.Request
redactable string
redacted string
}{
{
req: &kvpb.GetRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("a"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Get(Shared,Unreplicated)",
redacted: "Get(Shared,Unreplicated)",
},
{
req: &kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("b"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Scan(Shared,Unreplicated)",
redacted: "Scan(Shared,Unreplicated)",
},
{
req: &kvpb.ReverseScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("c"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "ReverseScan(Shared,Unreplicated)",
redacted: "ReverseScan(Shared,Unreplicated)",
},
{
req: &kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("ab"),
},
Commit: true,
},
redactable: "EndTxn(commit)",
redacted: "EndTxn(commit)",
},
{
req: &kvpb.RecoverTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("abc"),
},
Txn: txn.TxnMeta,
ImplicitlyCommitted: false,
},
redactable: "RecoverTxn(00fbff57, abort)",
redacted: "RecoverTxn(00fbff57, abort)",
},
{
req: &kvpb.PushTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("123"),
},
PusherTxn: pusherTxn,
PusheeTxn: pusheeTxn.TxnMeta,
},
redactable: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
redacted: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
},
}
for _, c := range testCases {
t.Run(c.req.Method().String(), func(t *testing.T) {
require.EqualValues(t, c.redactable, redact.Sprint(c.req))
require.EqualValues(t, c.redacted, redact.Sprint(c.req).Redact())
})
}
}
1 change: 1 addition & 0 deletions pkg/testutils/lint/passes/redactcheck/redactcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) {
"LeaseAppliedIndex": {},
"RaftIndex": {},
"RaftTerm": {},
"PushTxnType": {},
},
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": {
"SeqNum": {},
Expand Down