Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,tracing: introduce crdb_internal.cluster_inflight_traces #66679

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ table_name NOT IN (
'cluster_contended_keys',
'cluster_contended_indexes',
'cluster_contended_tables',
'cluster_inflight_traces',
'create_statements',
'create_type_statements',
'cross_db_references',
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/service",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/tracingservicepb:tracingservicepb_go_proto",
Expand Down
15 changes: 13 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/service"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingservicepb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -491,8 +492,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

var isAvailable func(roachpb.NodeID) bool
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
nodeLiveness, hasNodeLiveness := cfg.nodeLiveness.Optional(47900)
if hasNodeLiveness {
// TODO(erikgrinaker): We may want to use IsAvailableNotDraining instead, to
// avoid scheduling long-running flows (e.g. rangefeeds or backups) on nodes
// that are being drained/decommissioned. However, these nodes can still be
Expand All @@ -507,6 +508,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
}

// Setup the trace collector that is used to fetch inflight trace spans from
// all nodes in the cluster.
// The collector requires nodeliveness to get a list of all the nodes in the
// cluster.
var traceCollector *collector.TraceCollector
if hasNodeLiveness {
traceCollector = collector.New(cfg.nodeDialer, nodeLiveness, cfg.Settings.Tracer)
}

*execCfg = sql.ExecutorConfig{
Settings: cfg.Settings,
NodeInfo: nodeInfo,
Expand Down Expand Up @@ -537,6 +547,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
RootMemoryMonitor: rootSQLMemoryMonitor,
TestingKnobs: sqlExecutorTestingKnobs,
CompactEngineSpanFunc: compactEngineSpanFunc,
TraceCollector: traceCollector,

DistSQLPlanner: sql.NewDistSQLPlanner(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uint128",
"//pkg/util/uuid",
Expand All @@ -393,7 +394,6 @@ go_library(
"@com_github_cockroachdb_errors//hintdetail",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//:pq",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const (
CrdbInternalInterleaved
CrdbInternalCrossDbRefrences
CrdbInternalLostTableDescriptors
CrdbInternalClusterInflightTracesTable
InformationSchemaID
InformationSchemaAdministrableRoleAuthorizationsID
InformationSchemaApplicableRolesID
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var crdbInternal = virtualSchema{
catconstants.CrdbInternalInterleaved: crdbInternalInterleaved,
catconstants.CrdbInternalCrossDbRefrences: crdbInternalCrossDbReferences,
catconstants.CrdbInternalLostTableDescriptors: crdbLostTableDescriptors,
catconstants.CrdbInternalClusterInflightTracesTable: crdbInternalClusterInflightTracesTable,
},
validWithNoDatabaseContext: true,
}
Expand Down Expand Up @@ -1197,6 +1198,82 @@ CREATE TABLE crdb_internal.session_trace (
},
}

// crdbInternalClusterInflightTracesTable exposes cluster-wide inflight spans
// for a trace_id.
//
// crdbInternalClusterInflightTracesTable is an indexed, virtual table that only
// returns rows when accessed with an index constraint specifying the trace_id
// for which inflight spans need to be aggregated from all nodes in the cluster.
//
// Each row in the virtual table corresponds to a single `tracing.Recording` on
// a particular node. A `tracing.Recording` is the trace of a single operation
// rooted at a root span on that node. Under the hood, the virtual table
// contacts all "live" nodes in the cluster via the trace collector which
// streams back a recording at a time.
//
// The underlying trace collector only buffers recordings one node at a time.
// The virtual table also produces rows lazily, i.e. as and when they are
// consumed by the consumer. Therefore, the memory overhead of querying this
// table will be the size of all the `tracing.Recordings` of a particular
// `trace_id` on a single node in the cluster. Each `tracing.Recording` has its
// own memory protections via ring buffers, and so we do not expect this
// overhead to grow in an unbounded manner.
var crdbInternalClusterInflightTracesTable = virtualSchemaTable{
comment: `traces for in-flight spans across all nodes in the cluster (cluster RPC; expensive!)`,
schema: `
CREATE TABLE crdb_internal.cluster_inflight_traces (
trace_id INT NOT NULL, -- The trace's ID.
node_id INT NOT NULL, -- The node's ID.
trace_json STRING NULL, -- JSON representation of the traced remote operation.
trace_str STRING NULL, -- human readable representation of the traced remote operation.
jaeger_json STRING NULL, -- Jaeger JSON representation of the traced remote operation.
INDEX(trace_id)
)`,
indexes: []virtualIndex{{populate: func(ctx context.Context, constraint tree.Datum, p *planner,
db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
var traceID uint64
d := tree.UnwrapDatum(p.EvalContext(), constraint)
if d == tree.DNull {
return false, nil
}
switch t := d.(type) {
case *tree.DInt:
traceID = uint64(*t)
default:
return false, errors.AssertionFailedf(
"unexpected type %T for trace_id column in virtual table crdb_internal.cluster_inflight_traces", d)
}

traceCollector := p.ExecCfg().TraceCollector
for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() {
nodeID, recording := iter.Value()
traceJSON, err := tracing.TraceToJSON(recording)
if err != nil {
return false, err
}
traceString := recording.String()
traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID))
if err != nil {
return false, err
}
if err := addRow(tree.NewDInt(tree.DInt(traceID)), tree.NewDInt(tree.DInt(nodeID)),
tree.NewDString(traceJSON), tree.NewDString(traceString),
tree.NewDString(traceJaegerJSON)); err != nil {
return false, err
}
}

return true, nil
}}},
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor,
addRow func(...tree.Datum) error) error {
// We only want to generate rows when an index constraint is provided on the
// query accessing this vtable. This index constraint will provide the
// trace_id for which we will collect inflight trace spans from the cluster.
return nil
},
}

// crdbInternalInflightTraceSpanTable exposes the node-local registry of in-flight spans.
var crdbInternalInflightTraceSpanTable = virtualSchemaTable{
comment: `in-flight spans (RAM; local node only)`,
Expand Down
120 changes: 120 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/pgtype"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -808,3 +809,122 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
})
}
}

// setupTraces takes two tracers (potentially on different nodes), and creates
// two span hierarchies as depicted below. The method returns the traceIDs for
// both these span hierarchies, along with a cleanup method to Finish() all the
// opened spans.
//
// Traces on node1:
// -------------
// root <-- traceID1
// root.child <-- traceID1
// root.child.remotechild <-- traceID1
//
// Traces on node2:
// -------------
// root.child.remotechild2 <-- traceID1
// root.child.remotechilddone <-- traceID1
// root2 <-- traceID2
// root2.child <-- traceID2
func setupTraces(t1, t2 *tracing.Tracer) (uint64, func()) {
// Start a root span on "node 1".
root := t1.StartSpan("root", tracing.WithForceRealSpan())
root.SetVerbose(true)

time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a forked child span on "node 1".
childRemoteChild := t1.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))

// Start a remote child span on "node 2".
childRemoteChild2 := t2.StartSpan("root.child.remotechild2", tracing.WithParentAndManualCollection(child.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished.Finish()
child.ImportRemoteSpans(childRemoteChildFinished.GetRecording())

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
root2 := t2.StartSpan("root2", tracing.WithForceRealSpan())
root2.SetVerbose(true)

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
return root.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild,
childRemoteChild2, root2, child2} {
span.Finish()
}
}
}

func TestClusterInflightTracesVirtualTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := base.TestClusterArgs{}
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

node1Tracer := tc.Server(0).Tracer().(*tracing.Tracer)
node2Tracer := tc.Server(1).Tracer().(*tracing.Tracer)

traceID, cleanup := setupTraces(node1Tracer, node2Tracer)
defer cleanup()

t.Run("no-index-constraint", func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * from crdb_internal.cluster_inflight_traces`, [][]string{})
})

t.Run("with-index-constraint", func(t *testing.T) {
// We expect there to be 3 tracing.Recordings rooted at
// root, root.child.remotechild, root.child.remotechild2.
expectedRows := []struct {
traceID int
nodeID int
}{
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 2,
},
}
var rowIdx int
rows := sqlDB.Query(t, `SELECT trace_id, node_id, trace_json, trace_str, jaeger_json from crdb_internal.cluster_inflight_traces WHERE trace_id=$1`, traceID)
defer rows.Close()
for rows.Next() {
var traceID, nodeID int
var traceJSON, traceStr, jaegarJSON string
require.NoError(t, rows.Scan(&traceID, &nodeID, &traceJSON, &traceStr, &jaegarJSON))
require.Less(t, rowIdx, len(expectedRows))
expected := expectedRows[rowIdx]
require.Equal(t, expected.nodeID, nodeID)
require.Equal(t, expected.traceID, traceID)
require.NotEmpty(t, traceJSON)
require.NotEmpty(t, traceStr)
require.NotEmpty(t, jaegarJSON)
rowIdx++
}
})
}
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -918,6 +919,10 @@ type ExecutorConfig struct {
// CompactEngineSpanFunc is used to inform a storage engine of the need to
// perform compaction over a key span.
CompactEngineSpanFunc tree.CompactEngineSpanFunc

// TraceCollector is used to contact all live nodes in the cluster, and
// collect trace spans from their inflight node registries.
TraceCollector *collector.TraceCollector
}

// VersionUpgradeHook is used to run migrations starting in v21.1.
Expand Down
43 changes: 6 additions & 37 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
)

// setExplainBundleResult sets the result of an EXPLAIN ANALYZE (DEBUG)
Expand Down Expand Up @@ -88,39 +86,6 @@ func setExplainBundleResult(
return nil
}

// traceToJSON returns the string representation of the trace in JSON format.
//
// traceToJSON assumes that the first span in the recording contains all the
// other spans.
func traceToJSON(trace tracing.Recording) (string, error) {
root := normalizeSpan(trace[0], trace)
marshaller := jsonpb.Marshaler{
Indent: "\t",
}
str, err := marshaller.MarshalToString(&root)
if err != nil {
return "", err
}
return str, nil
}

func normalizeSpan(s tracingpb.RecordedSpan, trace tracing.Recording) tracingpb.NormalizedSpan {
var n tracingpb.NormalizedSpan
n.Operation = s.Operation
n.StartTime = s.StartTime
n.Duration = s.Duration
n.Tags = s.Tags
n.Logs = s.Logs

for _, ss := range trace {
if ss.ParentSpanID != s.SpanID {
continue
}
n.Children = append(n.Children, normalizeSpan(ss, trace))
}
return n
}

// diagnosticsBundle contains diagnostics information collected for a statement.
type diagnosticsBundle struct {
// Zip file binary data.
Expand Down Expand Up @@ -322,7 +287,7 @@ func (b *stmtBundleBuilder) addExplainVec() {
// trace (the default and the jaeger formats), the third one is a human-readable
// representation.
func (b *stmtBundleBuilder) addTrace() {
traceJSONStr, err := traceToJSON(b.trace)
traceJSONStr, err := tracing.TraceToJSON(b.trace)
if err != nil {
b.z.AddFile("trace.json", err.Error())
} else {
Expand All @@ -343,7 +308,11 @@ func (b *stmtBundleBuilder) addTrace() {

// Note that we're going to include the non-anonymized statement in the trace.
// But then again, nothing in the trace is anonymized.
jaegerJSON, err := b.trace.ToJaegerJSON(stmt)
comment := fmt.Sprintf(`This is a trace for SQL statement: %s
This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select the JSON File.
Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17
The UI can then be accessed at http://localhost:16686/search`, stmt)
jaegerJSON, err := b.trace.ToJaegerJSON(stmt, comment, "")
if err != nil {
b.z.AddFile("trace-jaeger.txt", err.Error())
} else {
Expand Down
Loading