Skip to content

Commit

Permalink
tracing: use byte-limits for recorded logs per span
Browse files Browse the repository at this point in the history
Touches #59188. We can introduce byte-limits for verbose logging
messages in a similar manner to what we've done for structured events.

This commit also:
- adds a _dropped tag to recordings with dropped logs/structured events.
- squashes a bug where reset spans (as used in SessionTracing) still
  held onto earlier structured events
- moves away from the internal usage of the opentracing.LogRecord type,
  it's unnecessary

Release justification: low risk, high benefit changes to existing
functionality

Release note: None
  • Loading branch information
irfansharif committed Mar 5, 2021
1 parent 7e1316f commit 2b71afb
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 56 deletions.
1 change: 1 addition & 0 deletions pkg/util/tracing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_test(
"//pkg/util/iterutil",
"//pkg/util/leaktest",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
119 changes: 85 additions & 34 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/types"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
)

// crdbSpan is a span for internal crdb usage. This is used to power SQL session
Expand All @@ -48,21 +47,33 @@ type crdbSpan struct {
// tag's key to a user.
logTags *logtags.Buffer

mu crdbSpanMu
mu crdbSpanMu
testing *testingKnob
}

type testingKnob struct {
clock timeutil.TimeSource
}

type crdbSpanMu struct {
syncutil.Mutex
// duration is initialized to -1 and set on Finish().
duration time.Duration

// recording maintains state once StartRecording() is called.
recording struct {
// recordingType is the recording type of the ongoing recording, if any.
// Its 'load' method may be called without holding the surrounding mutex,
// but its 'swap' method requires the mutex.
recordingType atomicRecordingType
recordedLogs []opentracing.LogRecord

logs sizeLimitedBuffer // of *tracingpb.LogRecords
structured sizeLimitedBuffer // of Structured events

// dropped is true if the span has capped out it's memory limits for
// logs and structured events, and has had to drop some. It's used to
// annotate recordings with the _dropped tag, when applicable.
dropped bool

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
Expand All @@ -79,13 +90,27 @@ type crdbSpanMu struct {
// those that were set before recording started)?
tags opentracing.Tags

bytesStructured int64
structured ring.Buffer // of Structured events

// The Span's associated baggage.
baggage map[string]string
}

func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer {
return sizeLimitedBuffer{
limit: limit,
}
}

type sizeLimitedBuffer struct {
ring.Buffer
size int64 // in bytes
limit int64 // in bytes
}

func (b *sizeLimitedBuffer) Reset() {
b.Buffer.Reset()
b.size = 0
}

func (s *crdbSpan) recordingType() RecordingType {
if s == nil {
return RecordingOff
Expand Down Expand Up @@ -123,7 +148,10 @@ func (s *crdbSpan) resetRecording() {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.recording.recordedLogs = nil
s.mu.recording.logs.Reset()
s.mu.recording.structured.Reset()
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.remoteSpans = nil
}
Expand Down Expand Up @@ -210,30 +238,54 @@ func (s *crdbSpan) record(msg string) {
return
}

s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.recording.recordedLogs) < maxLogsPerSpan {
s.mu.recording.recordedLogs = append(s.mu.recording.recordedLogs, opentracing.LogRecord{
Timestamp: time.Now(),
Fields: []otlog.Field{
otlog.String(tracingpb.LogMessageField, msg),
},
})
var now time.Time
if s.testing != nil {
now = s.testing.clock.Now()
} else {
now = time.Now()
}
logRecord := &tracingpb.LogRecord{
Time: now,
Fields: []tracingpb.LogRecord_Field{
{Key: tracingpb.LogMessageField, Value: msg},
},
}

s.recordInternal(logRecord, &s.mu.recording.logs)
}

func (s *crdbSpan) recordStructured(item Structured) {
s.recordInternal(item, &s.mu.recording.structured)
}

// sizable is a subset for protoutil.Message, for payloads (log records and
// structured events) that can be recorded.
type sizable interface {
Size() int
}

func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.bytesStructured += int64(item.Size())
for s.mu.bytesStructured > maxStructuredBytesPerSpan {
last := s.mu.structured.GetLast().(Structured)
s.mu.structured.RemoveLast()
s.mu.bytesStructured -= int64(last.Size())
size := int64(payload.Size())
if size > buffer.limit {
// The incoming payload alone blows past the memory limit. Let's just
// drop it.
s.mu.recording.dropped = true
return
}

s.mu.structured.AddFirst(item)
buffer.size += size
if buffer.size > buffer.limit {
s.mu.recording.dropped = true
}
for buffer.size > buffer.limit {
first := buffer.GetFirst().(sizable)
buffer.RemoveFirst()
buffer.size -= int64(first.Size())
}
buffer.AddLast(payload)
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
Expand Down Expand Up @@ -299,12 +351,15 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
if s.mu.recording.recordingType.load() == RecordingVerbose {
addTag("_verbose", "1")
}
if s.mu.recording.dropped {
addTag("_dropped", "1")
}
}

if numEvents := s.mu.structured.Len(); numEvents != 0 {
if numEvents := s.mu.recording.structured.Len(); numEvents != 0 {
rs.InternalStructured = make([]*types.Any, 0, numEvents)
for i := 0; i < numEvents; i++ {
event := s.mu.structured.Get(i).(Structured)
event := s.mu.recording.structured.Get(i).(Structured)
item, err := types.MarshalAny(event)
if err != nil {
// An error here is an error from Marshal; these
Expand Down Expand Up @@ -335,15 +390,11 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
}
}

rs.Logs = make([]tracingpb.LogRecord, len(s.mu.recording.recordedLogs))
for i, r := range s.mu.recording.recordedLogs {
rs.Logs[i].Time = r.Timestamp
rs.Logs[i].Fields = make([]tracingpb.LogRecord_Field, len(r.Fields))
for j, f := range r.Fields {
rs.Logs[i].Fields[j] = tracingpb.LogRecord_Field{
Key: f.Key(),
Value: fmt.Sprint(f.Value()),
}
if numLogs := s.mu.recording.logs.Len(); numLogs != 0 {
rs.Logs = make([]tracingpb.LogRecord, numLogs)
for i := 0; i < numLogs; i++ {
lr := s.mu.recording.logs.Get(i).(*tracingpb.LogRecord)
rs.Logs[i] = *lr
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/shadow.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func makeShadowSpan(
func createLightStepTracer(token string) (shadowTracerManager, opentracing.Tracer) {
return lightStepManager{}, lightstep.NewTracer(lightstep.Options{
AccessToken: token,
MaxLogsPerSpan: maxLogsPerSpan,
MaxLogsPerSpan: maxLogsPerSpanExternal,
MaxBufferedSpans: 10000,
UseGRPC: true,
})
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (sp *Span) Finish() {
// As a performance optimization, GetRecording does not return tags when the
// underlying Span is not verbose. Returning tags requires expensive
// stringification.
//
// A few internal tags are added to denote span properties:
//
// "_unfinished" The span was never Finish()ed
// "_verbose" The span is a verbose one
// "_dropped" The span dropped recordings due to sizing constraints
func (sp *Span) GetRecording() Recording {
// It's always valid to get the recording, even for a finished span.
return sp.i.GetRecording()
Expand Down Expand Up @@ -149,7 +155,8 @@ func (sp *Span) IsVerbose() bool {
return sp.i.IsVerbose()
}

// Record provides a way to record free-form text into verbose spans.
// Record provides a way to record free-form text into verbose spans. Recordings
// may be dropped due to sizing constraints.
//
// TODO(irfansharif): We don't currently have redactability with trace
// recordings (both here, and using RecordStructured above). We'll want to do this
Expand All @@ -171,7 +178,8 @@ func (sp *Span) Recordf(format string, args ...interface{}) {

// RecordStructured adds a Structured payload to the Span. It will be added to
// the recording even if the Span is not verbose; however it will be discarded
// if the underlying Span has been optimized out (i.e. is a noop span).
// if the underlying Span has been optimized out (i.e. is a noop span). Payloads
// may also be dropped due to sizing constraints.
//
// The caller must not mutate the item once RecordStructured has been called.
func (sp *Span) RecordStructured(item Structured) {
Expand Down
Loading

0 comments on commit 2b71afb

Please sign in to comment.