Skip to content

Commit

Permalink
Merge #85959
Browse files Browse the repository at this point in the history
85959: sql: add contention time to execution_insights table r=j82w a=j82w

This contention time is based on tracing, so
it will only be available for the small percent
of queries that gets traced.

part of [81024](#81024)

Release justification: category 2

Release note (sql change): Adds contention 
 time to execution_insights

Co-authored-by: j82w <[email protected]>
  • Loading branch information
craig[bot] and j82w committed Aug 17, 2022
2 parents f292c7a + f11927e commit 45a49ef
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 27 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,8 @@ func populateQueryLevelStats(ctx context.Context, p *planner) {
var err error
queryLevelStats, err := execstats.GetQueryLevelStats(
trace, p.execCfg.TestingKnobs.DeterministicExplain, flowsMetadata)
ih.queryLevelStatsWithErr = execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err)
queryLevelStatsWithErr := execstats.MakeQueryLevelStatsWithErr(queryLevelStats, err)
ih.queryLevelStatsWithErr = &queryLevelStatsWithErr
if err != nil {
const msg = "error getting query level stats for statement: %s: %+v"
if buildutil.CrdbTestBuild {
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6352,7 +6352,8 @@ CREATE TABLE crdb_internal.%s (
priority FLOAT NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING,
exec_node_ids INT[] NOT NULL
exec_node_ids INT[] NOT NULL,
contention INTERVAL
)`

var crdbInternalClusterExecutionInsightsTable = virtualSchemaTable{
Expand Down Expand Up @@ -6419,6 +6420,14 @@ func populateExecutionInsights(
autoRetryReason = tree.NewDString(insight.Statement.AutoRetryReason)
}

contentionTime := tree.DNull
if insight.Statement.Contention != nil {
contentionTime = tree.NewDInterval(
duration.MakeDuration(insight.Statement.Contention.Nanoseconds(), 0, 0),
types.DefaultIntervalTypeMetadata,
)
}

err = errors.CombineErrors(err, addRow(
tree.NewDString(hex.EncodeToString(insight.Session.ID.GetBytes())),
tree.NewDUuid(tree.DUuid{UUID: insight.Transaction.ID}),
Expand All @@ -6440,6 +6449,7 @@ func populateExecutionInsights(
tree.NewDInt(tree.DInt(insight.Statement.Retries)),
autoRetryReason,
execNodeIDs,
contentionTime,
))
}
return
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,13 @@ func (p *planner) maybeLogStatementInternal(
requiredTimeElapsed = 0
}
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
contentionNanos := telemetryMetrics.getContentionTime(p.instrumentation.queryLevelStatsWithErr.Stats.ContentionTime.Nanoseconds())
var contentionNanos int64
if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok {
contentionNanos = queryLevelStats.ContentionTime.Nanoseconds()
}

contentionNanos = telemetryMetrics.getContentionTime(contentionNanos)

skippedQueries := telemetryMetrics.resetSkippedQueryCount()
sampledQuery := eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func (ex *connExecutor) recordStatementSummary(
}

idxRecommendations := idxrecommendations.FormatIdxRecommendations(planner.instrumentation.indexRecommendations)
queryLevelStats, queryLevelStatsOk := planner.instrumentation.GetQueryLevelStats()

recordedStmtStats := sqlstats.RecordedStmtStats{
SessionID: ex.sessionID,
StatementID: planner.stmt.QueryID,
Expand All @@ -196,6 +198,7 @@ func (ex *connExecutor) recordStatementSummary(
EndTime: phaseTimes.GetSessionPhaseTime(sessionphase.PlannerEndExecStmt),
FullScan: fullScan,
SessionData: planner.SessionData(),
ExecStats: queryLevelStats,
}

stmtFingerprintID, err :=
Expand All @@ -210,8 +213,8 @@ func (ex *connExecutor) recordStatementSummary(

// Record statement execution statistics if span is recorded and no error was
// encountered while collecting query-level statistics.
if _, ok := planner.instrumentation.Tracing(); ok && planner.instrumentation.queryLevelStatsWithErr.Err == nil {
err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, planner.instrumentation.queryLevelStatsWithErr.Stats)
if queryLevelStatsOk {
err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, *queryLevelStats)
if err != nil {
if log.V(2 /* level */) {
log.Warningf(ctx, "unable to record statement exec stats: %s", err)
Expand Down
50 changes: 32 additions & 18 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type instrumentationHelper struct {
origCtx context.Context
evalCtx *eval.Context

queryLevelStatsWithErr execstats.QueryLevelStatsWithErr
queryLevelStatsWithErr *execstats.QueryLevelStatsWithErr

// If savePlanForStats is true, the explainPlan will be collected and returned
// via PlanForStats().
Expand Down Expand Up @@ -185,6 +185,18 @@ const (
explainAnalyzeDistSQLOutput
)

// GetQueryLevelStats gets the QueryLevelStats if they are available.
// The query level stats are only available if tracing is enabled.
func (ih *instrumentationHelper) GetQueryLevelStats() (stats *execstats.QueryLevelStats, ok bool) {
statsWithErr := ih.queryLevelStatsWithErr

if statsWithErr == nil || statsWithErr.Err != nil {
return nil, false
}

return &statsWithErr.Stats, true
}

// Tracing returns the current value of the instrumentation helper's span,
// along with a boolean that determines whether the span is populated.
func (ih *instrumentationHelper) Tracing() (sp *tracing.Span, ok bool) {
Expand Down Expand Up @@ -313,7 +325,6 @@ func (ih *instrumentationHelper) Finish(
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
var trace tracingpb.Recording
queryLevelStatsWithErr := ih.queryLevelStatsWithErr

if ih.shouldFinishSpan {
trace = ih.sp.FinishAndGetConfiguredRecording()
Expand All @@ -334,11 +345,12 @@ func (ih *instrumentationHelper) Finish(
)
}

queryLevelStats, ok := ih.GetQueryLevelStats()
// Accumulate txn stats if no error was encountered while collecting
// query-level statistics.
if queryLevelStatsWithErr.Err == nil {
if ok {
if collectExecStats || ih.implicitTxn {
txnStats.Accumulate(queryLevelStatsWithErr.Stats)
txnStats.Accumulate(*queryLevelStats)
}
}

Expand All @@ -355,7 +367,7 @@ func (ih *instrumentationHelper) Finish(
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
phaseTimes,
&queryLevelStatsWithErr.Stats,
queryLevelStats,
)
bundle = buildStatementBundle(
ih.origCtx, cfg.DB, ie.(*InternalExecutor), &p.curPlan, ob.BuildString(), trace, placeholders,
Expand All @@ -381,7 +393,7 @@ func (ih *instrumentationHelper) Finish(
if ih.outputMode == explainAnalyzeDistSQLOutput {
flows = p.curPlan.distSQLFlowInfos
}
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), &queryLevelStatsWithErr.Stats, flows, trace)
return ih.setExplainAnalyzeResult(ctx, res, statsCollector.PhaseTimes(), queryLevelStats, flows, trace)

default:
return nil
Expand Down Expand Up @@ -488,19 +500,21 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder(
ob.AddDistribution(ih.distribution.String())
ob.AddVectorized(ih.vectorized)

if queryStats.KVRowsRead != 0 {
ob.AddKVReadStats(queryStats.KVRowsRead, queryStats.KVBytesRead, queryStats.KVBatchRequestsIssued)
}
if queryStats.KVTime != 0 {
ob.AddKVTime(queryStats.KVTime)
}
if queryStats.ContentionTime != 0 {
ob.AddContentionTime(queryStats.ContentionTime)
}
if queryStats != nil {
if queryStats.KVRowsRead != 0 {
ob.AddKVReadStats(queryStats.KVRowsRead, queryStats.KVBytesRead, queryStats.KVBatchRequestsIssued)
}
if queryStats.KVTime != 0 {
ob.AddKVTime(queryStats.KVTime)
}
if queryStats.ContentionTime != 0 {
ob.AddContentionTime(queryStats.ContentionTime)
}

ob.AddMaxMemUsage(queryStats.MaxMemUsage)
ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent)
ob.AddMaxDiskUsage(queryStats.MaxDiskUsage)
ob.AddMaxMemUsage(queryStats.MaxMemUsage)
ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent)
ob.AddMaxDiskUsage(queryStats.MaxDiskUsage)
}

if len(ih.regions) > 0 {
ob.AddRegionsStats(ih.regions)
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/create_statements
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
priority FLOAT8 NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL
) CREATE TABLE crdb_internal.cluster_execution_insights (
session_id STRING NOT NULL,
txn_id UUID NOT NULL,
Expand All @@ -284,7 +285,8 @@ CREATE TABLE crdb_internal.cluster_execution_insights (
priority FLOAT8 NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL
) {} {}
CREATE TABLE crdb_internal.cluster_inflight_traces (
trace_id INT8 NOT NULL,
Expand Down Expand Up @@ -973,7 +975,8 @@ CREATE TABLE crdb_internal.node_execution_insights (
priority FLOAT8 NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL
) CREATE TABLE crdb_internal.node_execution_insights (
session_id STRING NOT NULL,
txn_id UUID NOT NULL,
Expand All @@ -994,7 +997,8 @@ CREATE TABLE crdb_internal.node_execution_insights (
priority FLOAT8 NOT NULL,
retries INT8 NOT NULL,
last_retry_reason STRING NULL,
exec_node_ids INT8[] NOT NULL
exec_node_ids INT8[] NOT NULL,
contention INTERVAL NULL
) {} {}
CREATE TABLE crdb_internal.node_inflight_trace_spans (
trace_id INT8 NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:duration_proto",
"@com_google_protobuf//:timestamp_proto",
],
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlstats/insights/insights.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option go_package = "insights";

import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

enum Concern {
// SlowExecution is for statement executions that either take longer than a predetermined
Expand Down Expand Up @@ -63,6 +64,7 @@ message Statement {
string auto_retry_reason = 16;
// Nodes is the ordered list of nodes ids on which the statement was executed.
repeated int64 nodes = 17;
google.protobuf.Duration contention = 18 [(gogoproto.stdduration) = true];
}

message Insight {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/insights/integration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/sqlstats/insights/integration/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package integration

import (
"context"
gosql "database/sql"
"fmt"
"os"
"sync"
"testing"
"time"

Expand All @@ -28,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -100,3 +103,99 @@ func TestInsightsIntegration(t *testing.T) {
return nil
}, 1*time.Second)
}

func TestInsightsIntegrationForContention(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}}
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

_, err := conn.Exec("SET tracing = true;")
require.NoError(t, err)
_, err = conn.Exec("SET cluster setting sql.txn_stats.sample_rate = 1;")
require.NoError(t, err)
_, err = conn.Exec("CREATE TABLE t (id string, s string);")
require.NoError(t, err)

// Enable detection by setting a latencyThreshold > 0.
latencyThreshold := 100 * time.Millisecond
insights.LatencyThreshold.Override(ctx, &settings.SV, latencyThreshold)

// Create a new connection, and then in a go routine have it start a transaction, update a row,
// sleep for a time, and then complete the transaction.
// With original connection attempt to update the same row being updated concurrently
// in the separate go routine, this will be blocked until the original transaction completes.
var wgTxnStarted sync.WaitGroup
wgTxnStarted.Add(1)

// Lock to wait for the txn to complete to avoid the test finishing before the txn is committed
var wgTxnDone sync.WaitGroup
wgTxnDone.Add(1)

go func() {
tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{})
require.NoError(t, errTxn)
_, errTxn = tx.ExecContext(ctx, "INSERT INTO t (id, s) VALUES ('test', 'originalValue');")
require.NoError(t, errTxn)
wgTxnStarted.Done()
_, errTxn = tx.ExecContext(ctx, "select pg_sleep(.5);")
require.NoError(t, errTxn)
errTxn = tx.Commit()
require.NoError(t, errTxn)
wgTxnDone.Done()
}()

start := timeutil.Now()

// Need to wait for the txn to start to ensure lock contention
wgTxnStarted.Wait()
// This will be blocked until the updateRowWithDelay finishes.
_, err = conn.ExecContext(ctx, "UPDATE t SET s = 'mainThread' where id = 'test';")
require.NoError(t, err)
end := timeutil.Now()
require.GreaterOrEqual(t, end.Sub(start), 500*time.Millisecond)

wgTxnDone.Wait()

// Verify the table content is valid.
testutils.SucceedsWithin(t, func() error {
rows, err := conn.QueryContext(ctx, "SELECT "+
"query, "+
"contention::FLOAT "+
"FROM crdb_internal.node_execution_insights where query like 'UPDATE t SET s =%'")
if err != nil {
return err
}

rowCount := 0
for rows.Next() {
rowCount++
if err != nil {
return err
}

var contentionFromQuery float64
var queryText string
err = rows.Scan(&queryText, &contentionFromQuery)
if err != nil {
return err
}

if contentionFromQuery < .2 {
return fmt.Errorf("contention time is %f should be greater than .2 since block is delayed by .5 seconds", contentionFromQuery)
}
}

if rowCount < 1 {
return fmt.Errorf("node_execution_insights did not return any rows")
}

return nil
}, 1*time.Second)
}
Loading

0 comments on commit 45a49ef

Please sign in to comment.