Skip to content

Commit

Permalink
colfetcher: track KV CPU time in the direct columnar scan
Browse files Browse the repository at this point in the history
This commit addresses a minor TODO to track the KV CPU time when direct
columnar scans are used. In the regular columnar scan this time is
tracked by the cFetcher, but with the KV projection pushdown the
cFetcher is used on the KV server side, so we need to augment the
ColBatchDirectScan to track it. Notably, this means that the decoding
done on the KV server side is included. Additionally, this commit
clarifies how the KV CPU time is obtained from the cFetcher (we don't
need to use a helper (unlike in the case of `bytesRead` and
`batchRequestsIssued` fields which are written to on `cFetcher.Close`),
and we don't need the mutex protection there either).

Release note: None
  • Loading branch information
yuzefovich committed Mar 15, 2023
1 parent 25c00d4 commit e956157
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
10 changes: 1 addition & 9 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -247,8 +246,7 @@ type cFetcher struct {
bytesRead int64
batchRequestsIssued int64
// cpuStopWatch tracks the CPU time spent by this cFetcher while fulfilling KV
// requests *in the current goroutine*. It should only be accessed through
// getKVCPUTime().
// requests *in the current goroutine*.
cpuStopWatch *timeutil.CPUStopWatch

// machine contains fields that get updated during the run of the fetcher.
Expand Down Expand Up @@ -1361,12 +1359,6 @@ func (cf *cFetcher) getBytesRead() int64 {
return cf.bytesRead
}

// getKVCPUTime returns the amount of CPU time spent in the current goroutine
// while fulfilling KV requests.
func (cf *cFetcher) getKVCPUTime() time.Duration {
return cf.cpuStopWatch.Elapsed()
}

// getBatchRequestsIssued returns the number of BatchRequests issued by the
// cFetcher throughout its lifetime so far.
func (cf *cFetcher) getBatchRequestsIssued() int64 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colfetcher/cfetcher_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func newCFetcherWrapper(
// MaxSpanRequestKeys limits of the BatchRequest), so we just have a
// reasonable default here.
const memoryLimit = execinfra.DefaultMemoryLimit
// Since we're using the cFetcher on the KV server side, we don't collect
// any statistics on it (these stats are about the SQL layer).
const collectStats = false
// We cannot reuse batches if we're not serializing the response.
alwaysReallocate := !mustSerialize
// TODO(yuzefovich, 23.1): think through estimatedRowCount (#94850) and
Expand All @@ -240,7 +243,7 @@ func newCFetcherWrapper(
0, /* estimatedRowCount */
false, /* traceKV */
true, /* singleUse */
false, /* collectStats */
collectStats,
alwaysReallocate,
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/colfetcher/colbatch_direct_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -40,6 +42,10 @@ type ColBatchDirectScan struct {
resultTypes []*types.T
hasDatumVec bool

// cpuStopWatch tracks the CPU time spent by this ColBatchDirectScan while
// fulfilling KV requests *in the current goroutine*.
cpuStopWatch *timeutil.CPUStopWatch

deserializer colexecutils.Deserializer
deserializerInitialized bool
}
Expand Down Expand Up @@ -70,7 +76,9 @@ func (s *ColBatchDirectScan) Next() (ret coldata.Batch) {
var res row.KVBatchFetcherResponse
var err error
for {
s.cpuStopWatch.Start()
res, err = s.fetcher.NextBatch(s.Ctx)
s.cpuStopWatch.Stop()
if err != nil {
colexecerror.InternalError(convertFetchError(s.spec, err))
}
Expand Down Expand Up @@ -140,9 +148,11 @@ func (s *ColBatchDirectScan) GetBatchRequestsIssued() int64 {
}

// GetKVCPUTime is part of the colexecop.KVReader interface.
//
// Note that this KV CPU time, unlike for the ColBatchScan, includes the
// decoding time done by the cFetcherWrapper.
func (s *ColBatchDirectScan) GetKVCPUTime() time.Duration {
// TODO(yuzefovich, 23.1): implement this.
return 0
return s.cpuStopWatch.Elapsed()
}

// Release implements the execreleasable.Releasable interface.
Expand Down Expand Up @@ -208,12 +218,17 @@ func NewColBatchDirectScan(
break
}
}
var cpuStopWatch *timeutil.CPUStopWatch
if execstats.ShouldCollectStats(ctx, flowCtx.CollectStats) {
cpuStopWatch = timeutil.NewCPUStopWatch()
}
return &ColBatchDirectScan{
colBatchScanBase: base,
fetcher: fetcher,
allocator: allocator,
spec: &fetchSpec,
resultTypes: tableArgs.typs,
hasDatumVec: hasDatumVec,
cpuStopWatch: cpuStopWatch,
}, tableArgs.typs, nil
}
4 changes: 1 addition & 3 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,7 @@ func (s *ColBatchScan) GetBatchRequestsIssued() int64 {

// GetKVCPUTime is part of the colexecop.KVReader interface.
func (s *ColBatchScan) GetKVCPUTime() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getKVCPUTime()
return s.cf.cpuStopWatch.Elapsed()
}

// Release implements the execreleasable.Releasable interface.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,7 @@ func (s *ColIndexJoin) GetBatchRequestsIssued() int64 {

// GetKVCPUTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetKVCPUTime() time.Duration {
s.mu.Lock()
defer s.mu.Unlock()
return s.cf.getKVCPUTime()
return s.cf.cpuStopWatch.Elapsed()
}

// GetContentionInfo is part of the colexecop.KVReader interface.
Expand Down

0 comments on commit e956157

Please sign in to comment.