Skip to content

Commit

Permalink
Merge pull request #116355 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-114481

release-23.2: kv: include locking strength and durability in Get/Scan/RevScan SafeFormat
  • Loading branch information
michae2 authored Jan 19, 2024
2 parents ea5823f + 7e830da commit d8e4adc
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Check the index as well.
exec-privileged-op-tenant
ALTER INDEX t@idx SPLIT AT VALUES (1)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/2/1] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Grant the capability without providing an explicit value.
update-capabilities
Expand All @@ -64,7 +64,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t SPLIT AT VALUES (0)
----
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0,/Min) RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)
pq: ba: AdminSplit [/Tenant/10/Table/104/1/0] RPC error: rpc error: code = Unauthenticated desc = client tenant does not have capability "can_admin_split" (*kvpb.AdminSplitRequest)

# Lastly, use the explicitly set to true syntax.
update-capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]

update-capabilities
ALTER TENANT [10] GRANT CAPABILITY can_admin_unsplit=true
Expand All @@ -31,4 +31,4 @@ ok
exec-privileged-op-tenant
ALTER TABLE t UNSPLIT AT VALUES (0)
----
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0,/Min) RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
pq: could not UNSPLIT AT (0): ba: AdminUnsplit [/Tenant/10/Table/104/1/0] RPC error: grpc: client tenant does not have capability "can_admin_unsplit" (*kvpb.AdminUnsplitRequest) [code 16/Unauthenticated]
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -4531,12 +4532,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = roachpb.Key("a")
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
{
exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` +
exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
` r9:‹{x-z}› [<no replicas>, next=0, gen=0]; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br)
Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,86 @@ type Request interface {
flags() flag
}

// SafeFormatterRequest is an optional extension interface used to allow request to do custom formatting.
type SafeFormatterRequest interface {
Request
redact.SafeFormatter
}

var _ SafeFormatterRequest = (*GetRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (gr *GetRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(gr.Method())
if gr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", gr.KeyLockingStrength, gr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (sr *ScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(sr.Method())
if sr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", sr.KeyLockingStrength, sr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*ReverseScanRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rsr *ReverseScanRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Print(rsr.Method())
if rsr.KeyLockingStrength == lock.None {
return
}
s.Printf("(%s,%s)", rsr.KeyLockingStrength, rsr.KeyLockingDurability)
}

var _ SafeFormatterRequest = (*EndTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (etr *EndTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(", etr.Method())
if etr.Commit {
if etr.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if etr.InternalCommitTrigger != nil {
s.Printf(" %s", etr.InternalCommitTrigger.Kind())
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*RecoverTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (rtr *RecoverTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("%s(%s, ", rtr.Method(), rtr.Txn.Short())
if rtr.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(")")
}

var _ SafeFormatterRequest = (*PushTxnRequest)(nil)

// SafeFormat implements the redact.SafeFormatter interface.
func (ptr *PushTxnRequest) SafeFormat(s redact.SafePrinter, _ rune) {
s.Printf("PushTxn(%s,%s->%s)",
ptr.PushType, ptr.PusherTxn.Short(), ptr.PusheeTxn.Short())
}

// LockingReadRequest is an interface used to expose the key-level locking
// strength of a read-only request.
type LockingReadRequest interface {
Expand Down Expand Up @@ -2073,3 +2153,6 @@ type RangeFeedEventProducer interface {
// range needs to be restarted.
Recv() (*RangeFeedEvent, error)
}

// SafeValue implements the redact.SafeValue interface.
func (PushTxnType) SafeValue() {}
43 changes: 9 additions & 34 deletions pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {

// SafeFormat implements redact.SafeFormatter.
// It gives a brief summary of the contained requests and keys in the batch.
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
func (ba BatchRequest) SafeFormat(s redact.SafePrinter, verb rune) {
for count, arg := range ba.Requests {
// Limit the strings to provide just a summary. Without this limit
// a log message with a BatchRequest can be very long.
Expand All @@ -810,41 +810,16 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) {
}

req := arg.GetInner()
if et, ok := req.(*EndTxnRequest); ok {
h := req.Header()
s.Printf("%s(", req.Method())
if et.Commit {
if et.IsParallelCommit() {
s.Printf("parallel commit")
} else {
s.Printf("commit")
}
} else {
s.Printf("abort")
}
if et.InternalCommitTrigger != nil {
s.Printf(" %s", et.InternalCommitTrigger.Kind())
}
s.Printf(") [%s]", h.Key)
} else if rt, ok := req.(*RecoverTxnRequest); ok {
h := req.Header()
s.Printf("%s(%s, ", req.Method(), rt.Txn.Short())
if rt.ImplicitlyCommitted {
s.Printf("commit")
} else {
s.Printf("abort")
}
s.Printf(") [%s]", h.Key)
if safeFormatterReq, ok := req.(SafeFormatterRequest); ok {
safeFormatterReq.SafeFormat(s, verb)
} else {
h := req.Header()
if req.Method() == PushTxn {
pushReq := req.(*PushTxnRequest)
s.Printf("PushTxn(%s,%s->%s)",
pushReq.PushType, pushReq.PusherTxn.Short(), pushReq.PusheeTxn.Short())
} else {
s.Print(req.Method())
}
s.Print(req.Method())
}
h := req.Header()
if len(h.EndKey) > 0 {
s.Printf(" [%s,%s)", h.Key, h.EndKey)
} else {
s.Printf(" [%s]", h.Key)
}
}
{
Expand Down
95 changes: 94 additions & 1 deletion pkg/kv/kvpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatchRequestString(t *testing.T) {
ba.Requests = append(ba.Requests, ru)

{
exp := `Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min),... 76 skipped ..., Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), Get [/Min,/Min), EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
exp := `Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min], Get [/Min],... 76 skipped ..., Get [/Min], Get [/Min], Get [/Min], Get [/Min], EndTxn(abort) [/Min], [txn: 6ba7b810], [wait-policy: Error], [protect-ambiguous-replay], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]`
act := ba.String()
require.Equal(t, exp, act)
}
Expand Down Expand Up @@ -98,3 +98,96 @@ func TestAmbiguousResultError(t *testing.T) {
s := fmt.Sprintf("%s\n%s", err, redact.Sprint(err))
echotest.Require(t, s, filepath.Join("testdata", "ambiguous_result_error.txt"))
}

// Unit test the requests that implemented SafeFormatterRequest interface.
func TestRequestSafeFormat(t *testing.T) {
txn := roachpb.MakeTransaction("txn1", []byte("abc"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 6, 0, false)
fixedUuid, _ := uuid.FromString("00fbff57-c1ee-48ce-966c-da568d50e425")
txn.ID = fixedUuid
pusherTxn := roachpb.MakeTransaction("txn2", []byte("123"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
pusheeTxn := roachpb.MakeTransaction("txn3", []byte("1234"), 0, 0, hlc.Timestamp{WallTime: 10}, 0, 1, 0, false)
fixedUuid2, _ := uuid.FromString("00fbff58-c1ee-48ce-966c-da568d50e425")
fixedUuid3, _ := uuid.FromString("00fbff59-c1ee-48ce-966c-da568d50e425")
pusherTxn.ID = fixedUuid2
pusheeTxn.ID = fixedUuid3

testCases := []struct {
req kvpb.Request
redactable string
redacted string
}{
{
req: &kvpb.GetRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("a"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Get(Shared,Unreplicated)",
redacted: "Get(Shared,Unreplicated)",
},
{
req: &kvpb.ScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("b"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "Scan(Shared,Unreplicated)",
redacted: "Scan(Shared,Unreplicated)",
},
{
req: &kvpb.ReverseScanRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("c"),
EndKey: roachpb.Key("d"),
},
KeyLockingStrength: lock.Shared,
KeyLockingDurability: lock.Unreplicated,
},
redactable: "ReverseScan(Shared,Unreplicated)",
redacted: "ReverseScan(Shared,Unreplicated)",
},
{
req: &kvpb.EndTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("ab"),
},
Commit: true,
},
redactable: "EndTxn(commit)",
redacted: "EndTxn(commit)",
},
{
req: &kvpb.RecoverTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("abc"),
},
Txn: txn.TxnMeta,
ImplicitlyCommitted: false,
},
redactable: "RecoverTxn(00fbff57, abort)",
redacted: "RecoverTxn(00fbff57, abort)",
},
{
req: &kvpb.PushTxnRequest{
RequestHeader: kvpb.RequestHeader{
Key: roachpb.Key("123"),
},
PusherTxn: pusherTxn,
PusheeTxn: pusheeTxn.TxnMeta,
},
redactable: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
redacted: "PushTxn(PUSH_TIMESTAMP,00fbff58->00fbff59)",
},
}
for _, c := range testCases {
t.Run(c.req.Method().String(), func(t *testing.T) {
require.EqualValues(t, c.redactable, redact.Sprint(c.req))
require.EqualValues(t, c.redacted, redact.Sprint(c.req).Redact())
})
}
}
1 change: 1 addition & 0 deletions pkg/testutils/lint/passes/redactcheck/redactcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func runAnalyzer(pass *analysis.Pass) (interface{}, error) {
"LeaseAppliedIndex": {},
"RaftIndex": {},
"RaftTerm": {},
"PushTxnType": {},
},
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb": {
"SeqNum": {},
Expand Down

0 comments on commit d8e4adc

Please sign in to comment.