Skip to content

Commit

Permalink
sql,tracing: introduce crdb_internal.cluster_inflight_traces
Browse files Browse the repository at this point in the history
This change adds a new indexed, virtual table
`crdb_internal.cluster_inflight_traces`. This table surfaces
cluster wide inflight traces for the trace_id specified via
an index constraint.

Each row in the virtual table corresponds to a
`tracing.Recording` on a particular node for the given
trace ID. A `tracing.Recording` is the trace of a single
operation rooted at a root span on that node. Under the hood,
the virtual table contacts all "live" nodes in the cluster
via the trace collector which streams back a recording at a
time.

The table has 3 additional columns that surface the raw JSON,
string, and JaegarJSON represenation of the recording. These
formats are what we dump in a stmt bundle as well, and have
been considered the best way to consume traces. This table
is not meant to be consumed directly via the SQL shell but
will have CLI wrapper built on top of it that will assimilate
and write the traces to files. Simliar to how we dump a stmt
bundle.

This change also tweaks some of the recording->string methods
to include StructuredRecords.

Release note (sql change): adds a virtual table
`crdb_internal.cluster_inflight_traces` which surfaces
cluster wide inflight traces for the trace_id specified via
an index constraint. The output of this table is not appropriate
to consume over a SQL connection; follow up changes will add
CLI wrappers to make the interaction more user friendly.
  • Loading branch information
adityamaru committed Jun 24, 2021
1 parent 3b8b689 commit 2a9a5ee
Show file tree
Hide file tree
Showing 24 changed files with 513 additions and 101 deletions.
1 change: 1 addition & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ table_name NOT IN (
'cluster_contended_keys',
'cluster_contended_indexes',
'cluster_contended_tables',
'cluster_inflight_traces',
'create_statements',
'create_type_statements',
'cross_db_references',
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/service",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/tracingservicepb:tracingservicepb_go_proto",
Expand Down
15 changes: 13 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/service"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingservicepb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -491,8 +492,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

var isLive func(roachpb.NodeID) (bool, error)
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
nodeLiveness, hasNodeLiveness := cfg.nodeLiveness.Optional(47900)
if hasNodeLiveness {
isLive = nodeLiveness.IsLive
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
Expand All @@ -502,6 +503,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
}

// Setup the trace collector that is used to fetch inflight trace spans from
// all nodes in the cluster.
// The collector requires nodeliveness to get a list of all the nodes in the
// cluster.
var traceCollector *collector.TraceCollector
if hasNodeLiveness {
traceCollector = collector.New(cfg.nodeDialer, nodeLiveness, cfg.Settings.Tracer)
}

*execCfg = sql.ExecutorConfig{
Settings: cfg.Settings,
NodeInfo: nodeInfo,
Expand Down Expand Up @@ -532,6 +542,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
RootMemoryMonitor: rootSQLMemoryMonitor,
TestingKnobs: sqlExecutorTestingKnobs,
CompactEngineSpanFunc: compactEngineSpanFunc,
TraceCollector: traceCollector,

DistSQLPlanner: sql.NewDistSQLPlanner(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uint128",
"//pkg/util/uuid",
Expand All @@ -393,7 +394,6 @@ go_library(
"@com_github_cockroachdb_errors//hintdetail",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//:pq",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const (
CrdbInternalInterleaved
CrdbInternalCrossDbRefrences
CrdbInternalLostTableDescriptors
CrdbInternalClusterInflightTracesTable
InformationSchemaID
InformationSchemaAdministrableRoleAuthorizationsID
InformationSchemaApplicableRolesID
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var crdbInternal = virtualSchema{
catconstants.CrdbInternalInterleaved: crdbInternalInterleaved,
catconstants.CrdbInternalCrossDbRefrences: crdbInternalCrossDbReferences,
catconstants.CrdbInternalLostTableDescriptors: crdbLostTableDescriptors,
catconstants.CrdbInternalClusterInflightTracesTable: crdbInternalClusterInflightTracesTable,
},
validWithNoDatabaseContext: true,
}
Expand Down Expand Up @@ -1197,6 +1198,82 @@ CREATE TABLE crdb_internal.session_trace (
},
}

// crdbInternalClusterInflightTracesTable exposes cluster-wide inflight spans
// for a trace_id.
//
// crdbInternalClusterInflightTracesTable is an indexed, virtual table that only
// returns rows when accessed with an index constraint specifying the trace_id
// for which inflight spans need to be aggregated from all nodes in the cluster.
//
// Each row in the virtual table corresponds to a single `tracing.Recording` on
// a particular node. A `tracing.Recording` is the trace of a single operation
// rooted at a root span on that node. Under the hood, the virtual table
// contacts all "live" nodes in the cluster via the trace collector which
// streams back a recording at a time.
//
// The underlying trace collector only buffers recordings one node at a time.
// The virtual table also produces rows lazily, i.e. as and when they are
// consumed by the consumer. Therefore, the memory overhead of querying this
// table will be the size of all the `tracing.Recordings` of a particular
// `trace_id` on a single node in the cluster. Each `tracing.Recording` has its
// own memory protections via ring buffers, and so we do not expect this
// overhead to grow in an unbounded manner.
var crdbInternalClusterInflightTracesTable = virtualSchemaTable{
comment: `traces for in-flight spans across all nodes in the cluster (cluster RPC; expensive!)`,
schema: `
CREATE TABLE crdb_internal.cluster_inflight_traces (
trace_id INT NOT NULL, -- The trace's ID.
node_id INT NOT NULL, -- The node's ID.
trace_json STRING NULL, -- JSON representation of the traced remote operation.
trace_str STRING NULL, -- human readable representation of the traced remote operation.
jaeger_json STRING NULL, -- Jaeger JSON representation of the traced remote operation.
INDEX(trace_id)
)`,
indexes: []virtualIndex{{populate: func(ctx context.Context, constraint tree.Datum, p *planner,
db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
var traceID uint64
d := tree.UnwrapDatum(p.EvalContext(), constraint)
if d == tree.DNull {
return false, nil
}
switch t := d.(type) {
case *tree.DInt:
traceID = uint64(*t)
default:
return false, errors.AssertionFailedf(
"unexpected type %T for trace_id column in virtual table crdb_internal.cluster_inflight_traces", d)
}

traceCollector := p.ExecCfg().TraceCollector
for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() {
nodeID, recording := iter.Value()
traceJSON, err := tracing.TraceToJSON(recording)
if err != nil {
return false, err
}
traceString := recording.String()
traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID))
if err != nil {
return false, err
}
if err := addRow(tree.NewDInt(tree.DInt(traceID)), tree.NewDInt(tree.DInt(nodeID)),
tree.NewDString(traceJSON), tree.NewDString(traceString),
tree.NewDString(traceJaegerJSON)); err != nil {
return false, err
}
}

return true, nil
}}},
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor,
addRow func(...tree.Datum) error) error {
// We only want to generate rows when an index constraint is provided on the
// query accessing this vtable. This index constraint will provide the
// trace_id for which we will collect inflight trace spans from the cluster.
return nil
},
}

// crdbInternalInflightTraceSpanTable exposes the node-local registry of in-flight spans.
var crdbInternalInflightTraceSpanTable = virtualSchemaTable{
comment: `in-flight spans (RAM; local node only)`,
Expand Down
120 changes: 120 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/pgtype"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -808,3 +809,122 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
})
}
}

// setupTraces takes two tracers (potentially on different nodes), and creates
// two span hierarchies as depicted below. The method returns the traceIDs for
// both these span hierarchies, along with a cleanup method to Finish() all the
// opened spans.
//
// Traces on node1:
// -------------
// root <-- traceID1
// root.child <-- traceID1
// root.child.remotechild <-- traceID1
//
// Traces on node2:
// -------------
// root.child.remotechild2 <-- traceID1
// root.child.remotechilddone <-- traceID1
// root2 <-- traceID2
// root2.child <-- traceID2
func setupTraces(t1, t2 *tracing.Tracer) (uint64, func()) {
// Start a root span on "node 1".
root := t1.StartSpan("root", tracing.WithForceRealSpan())
root.SetVerbose(true)

time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a forked child span on "node 1".
childRemoteChild := t1.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))

// Start a remote child span on "node 2".
childRemoteChild2 := t2.StartSpan("root.child.remotechild2", tracing.WithParentAndManualCollection(child.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished.Finish()
child.ImportRemoteSpans(childRemoteChildFinished.GetRecording())

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
root2 := t2.StartSpan("root2", tracing.WithForceRealSpan())
root2.SetVerbose(true)

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
return root.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild,
childRemoteChild2, root2, child2} {
span.Finish()
}
}
}

func TestClusterInflightTracesVirtualTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := base.TestClusterArgs{}
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

node1Tracer := tc.Server(0).Tracer().(*tracing.Tracer)
node2Tracer := tc.Server(1).Tracer().(*tracing.Tracer)

traceID, cleanup := setupTraces(node1Tracer, node2Tracer)
defer cleanup()

t.Run("no-index-constraint", func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * from crdb_internal.cluster_inflight_traces`, [][]string{})
})

t.Run("with-index-constraint", func(t *testing.T) {
// We expect there to be 3 tracing.Recordings rooted at
// root, root.child.remotechild, root.child.remotechild2.
expectedRows := []struct {
traceID int
nodeID int
}{
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 2,
},
}
var rowIdx int
rows := sqlDB.Query(t, `SELECT trace_id, node_id, trace_json, trace_str, jaeger_json from crdb_internal.cluster_inflight_traces WHERE trace_id=$1`, traceID)
defer rows.Close()
for rows.Next() {
var traceID, nodeID int
var traceJSON, traceStr, jaegarJSON string
require.NoError(t, rows.Scan(&traceID, &nodeID, &traceJSON, &traceStr, &jaegarJSON))
require.Less(t, rowIdx, len(expectedRows))
expected := expectedRows[rowIdx]
require.Equal(t, expected.nodeID, nodeID)
require.Equal(t, expected.traceID, traceID)
require.NotEmpty(t, traceJSON)
require.NotEmpty(t, traceStr)
require.NotEmpty(t, jaegarJSON)
rowIdx++
}
})
}
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -918,6 +919,10 @@ type ExecutorConfig struct {
// CompactEngineSpanFunc is used to inform a storage engine of the need to
// perform compaction over a key span.
CompactEngineSpanFunc tree.CompactEngineSpanFunc

// TraceCollector is used to contact all live nodes in the cluster, and
// collect trace spans from their inflight node registries.
TraceCollector *collector.TraceCollector
}

// VersionUpgradeHook is used to run migrations starting in v21.1.
Expand Down
43 changes: 6 additions & 37 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
)

// setExplainBundleResult sets the result of an EXPLAIN ANALYZE (DEBUG)
Expand Down Expand Up @@ -88,39 +86,6 @@ func setExplainBundleResult(
return nil
}

// traceToJSON returns the string representation of the trace in JSON format.
//
// traceToJSON assumes that the first span in the recording contains all the
// other spans.
func traceToJSON(trace tracing.Recording) (string, error) {
root := normalizeSpan(trace[0], trace)
marshaller := jsonpb.Marshaler{
Indent: "\t",
}
str, err := marshaller.MarshalToString(&root)
if err != nil {
return "", err
}
return str, nil
}

func normalizeSpan(s tracingpb.RecordedSpan, trace tracing.Recording) tracingpb.NormalizedSpan {
var n tracingpb.NormalizedSpan
n.Operation = s.Operation
n.StartTime = s.StartTime
n.Duration = s.Duration
n.Tags = s.Tags
n.Logs = s.Logs

for _, ss := range trace {
if ss.ParentSpanID != s.SpanID {
continue
}
n.Children = append(n.Children, normalizeSpan(ss, trace))
}
return n
}

// diagnosticsBundle contains diagnostics information collected for a statement.
type diagnosticsBundle struct {
// Zip file binary data.
Expand Down Expand Up @@ -322,7 +287,7 @@ func (b *stmtBundleBuilder) addExplainVec() {
// trace (the default and the jaeger formats), the third one is a human-readable
// representation.
func (b *stmtBundleBuilder) addTrace() {
traceJSONStr, err := traceToJSON(b.trace)
traceJSONStr, err := tracing.TraceToJSON(b.trace)
if err != nil {
b.z.AddFile("trace.json", err.Error())
} else {
Expand All @@ -343,7 +308,11 @@ func (b *stmtBundleBuilder) addTrace() {

// Note that we're going to include the non-anonymized statement in the trace.
// But then again, nothing in the trace is anonymized.
jaegerJSON, err := b.trace.ToJaegerJSON(stmt)
comment := fmt.Sprintf(`This is a trace for SQL statement: %s
This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select the JSON File.
Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17
The UI can then be accessed at http://localhost:16686/search`, stmt)
jaegerJSON, err := b.trace.ToJaegerJSON(stmt, comment, "")
if err != nil {
b.z.AddFile("trace-jaeger.txt", err.Error())
} else {
Expand Down
Loading

0 comments on commit 2a9a5ee

Please sign in to comment.