Skip to content

Commit

Permalink
tracing: use byte-limits for recorded logs per span
Browse files Browse the repository at this point in the history
Touches cockroachdb#59188. We can introduce byte-limits for verbose logging
messages in a similar manner to what we've done for structured events.

This commit also:
- adds a _dropped tag to recordings with dropped logs/structured events.
- squashes a bug where reset spans (as used in SessionTracing) still
  held onto earlier structured events
- moves away from the internal usage of the opentracing.LogRecord type,
  it's unnecessary

Release justification: low risk, high benefit changes to existing
functionality

Release note: None
  • Loading branch information
irfansharif committed Mar 3, 2021
1 parent 9902059 commit c86ce89
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 54 deletions.
92 changes: 60 additions & 32 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/types"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
)

// crdbSpan is a span for internal crdb usage. This is used to power SQL session
Expand Down Expand Up @@ -56,13 +55,21 @@ type crdbSpanMu struct {
// duration is initialized to -1 and set on Finish().
duration time.Duration

// recording maintains state once StartRecording() is called.
recording struct {
// recordingType is the recording type of the ongoing recording, if any.
// Its 'load' method may be called without holding the surrounding mutex,
// but its 'swap' method requires the mutex.
recordingType atomicRecordingType
recordedLogs []opentracing.LogRecord

logs ring.Buffer // of *tracingpb.LogRecords
structured ring.Buffer // of Structured events
bytes int64 // total memory utilization of logs and structured events

// dropped is true if the span has capped out it's memory limits for
// logs and structured events, and has had to drop some. It's used to
// annotate recordings with the _dropped tag, when applicable.
dropped bool

// children contains the list of child spans started after this Span
// started recording.
children []*crdbSpan
Expand All @@ -79,9 +86,6 @@ type crdbSpanMu struct {
// those that were set before recording started)?
tags opentracing.Tags

bytesStructured int64
structured ring.Buffer // of Structured events

// The Span's associated baggage.
baggage map[string]string
}
Expand Down Expand Up @@ -123,7 +127,11 @@ func (s *crdbSpan) resetRecording() {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.recording.recordedLogs = nil
s.mu.recording.logs.Reset()
s.mu.recording.structured.Reset()
s.mu.recording.bytes = 0
s.mu.recording.dropped = false

s.mu.recording.children = nil
s.mu.recording.remoteSpans = nil
}
Expand Down Expand Up @@ -210,30 +218,51 @@ func (s *crdbSpan) record(msg string) {
return
}

logRecord := &tracingpb.LogRecord{
Time: time.Now(),
Fields: []tracingpb.LogRecord_Field{
{Key: tracingpb.LogMessageField, Value: msg},
},
}

s.mu.Lock()
defer s.mu.Unlock()
if len(s.mu.recording.recordedLogs) < maxLogsPerSpan {
s.mu.recording.recordedLogs = append(s.mu.recording.recordedLogs, opentracing.LogRecord{
Timestamp: time.Now(),
Fields: []otlog.Field{
otlog.String(tracingpb.LogMessageField, msg),
},
})
}

s.recordInternalLocked(logRecord, &s.mu.recording.logs)
}

func (s *crdbSpan) recordStructured(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()

s.mu.bytesStructured += int64(item.Size())
for s.mu.bytesStructured > maxStructuredBytesPerSpan {
last := s.mu.structured.GetLast().(Structured)
s.mu.structured.RemoveLast()
s.mu.bytesStructured -= int64(last.Size())
s.recordInternalLocked(item, &s.mu.recording.structured)
}

// sizable is a subset for protoutil.Message, for payloads (log records and
// structured events) that can be recorded.
type sizable interface {
Size() int
}

func (s *crdbSpan) recordInternalLocked(payload sizable, buffer *ring.Buffer) {
size := int64(payload.Size())
if size > maxRecordedBytesPerSpan {
// The incoming payload alone blows past the memory limit. Let's just
// drop it.
s.mu.recording.dropped = true
return
}

s.mu.structured.AddFirst(item)
s.mu.recording.bytes += size
if s.mu.recording.bytes > maxRecordedBytesPerSpan {
s.mu.recording.dropped = true
}
for s.mu.recording.bytes > maxRecordedBytesPerSpan {
first := buffer.GetFirst().(sizable)
buffer.RemoveFirst()
s.mu.recording.bytes -= int64(first.Size())
}
buffer.AddLast(payload)
}

func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) {
Expand Down Expand Up @@ -299,12 +328,15 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
if s.mu.recording.recordingType.load() == RecordingVerbose {
addTag("_verbose", "1")
}
if s.mu.recording.dropped {
addTag("_dropped", "1")
}
}

if numEvents := s.mu.structured.Len(); numEvents != 0 {
if numEvents := s.mu.recording.structured.Len(); numEvents != 0 {
rs.InternalStructured = make([]*types.Any, 0, numEvents)
for i := 0; i < numEvents; i++ {
event := s.mu.structured.Get(i).(Structured)
event := s.mu.recording.structured.Get(i).(Structured)
item, err := types.MarshalAny(event)
if err != nil {
// An error here is an error from Marshal; these
Expand Down Expand Up @@ -335,15 +367,11 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan {
}
}

rs.Logs = make([]tracingpb.LogRecord, len(s.mu.recording.recordedLogs))
for i, r := range s.mu.recording.recordedLogs {
rs.Logs[i].Time = r.Timestamp
rs.Logs[i].Fields = make([]tracingpb.LogRecord_Field, len(r.Fields))
for j, f := range r.Fields {
rs.Logs[i].Fields[j] = tracingpb.LogRecord_Field{
Key: f.Key(),
Value: fmt.Sprint(f.Value()),
}
if numLogs := s.mu.recording.logs.Len(); numLogs != 0 {
rs.Logs = make([]tracingpb.LogRecord, numLogs)
for i := 0; i < numLogs; i++ {
lr := s.mu.recording.logs.Get(i).(*tracingpb.LogRecord)
rs.Logs[i] = *lr
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/shadow.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func makeShadowSpan(
func createLightStepTracer(token string) (shadowTracerManager, opentracing.Tracer) {
return lightStepManager{}, lightstep.NewTracer(lightstep.Options{
AccessToken: token,
MaxLogsPerSpan: maxLogsPerSpan,
MaxLogsPerSpan: maxLogsPerSpanExternal,
MaxBufferedSpans: 10000,
UseGRPC: true,
})
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (sp *Span) Finish() {
// As a performance optimization, GetRecording does not return tags when the
// underlying Span is not verbose. Returning tags requires expensive
// stringification.
//
// A few internal tags are added to denote span properties:
//
// "_unfinished" The span was never Finish()ed
// "_verbose" The span is a verbose one
// "_dropped" The span dropped recordings due to sizing constraints
func (sp *Span) GetRecording() Recording {
// It's always valid to get the recording, even for a finished span.
return sp.i.GetRecording()
Expand Down Expand Up @@ -149,7 +155,8 @@ func (sp *Span) IsVerbose() bool {
return sp.i.IsVerbose()
}

// Record provides a way to record free-form text into verbose spans.
// Record provides a way to record free-form text into verbose spans. Recordings
// may be dropped due to sizing constraints.
//
// TODO(irfansharif): We don't currently have redactability with trace
// recordings (both here, and using RecordStructured above). We'll want to do this
Expand All @@ -171,7 +178,8 @@ func (sp *Span) Recordf(format string, args ...interface{}) {

// RecordStructured adds a Structured payload to the Span. It will be added to
// the recording even if the Span is not verbose; however it will be discarded
// if the underlying Span has been optimized out (i.e. is a noop span).
// if the underlying Span has been optimized out (i.e. is a noop span). Payloads
// may also be dropped due to sizing constraints.
//
// The caller must not mutate the item once RecordStructured has been called.
func (sp *Span) RecordStructured(item Structured) {
Expand Down
154 changes: 142 additions & 12 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/types"
Expand All @@ -34,8 +35,14 @@ func TestRecordingString(t *testing.T) {
root := tr.StartSpan("root", WithForceRealSpan())
root.SetVerbose(true)
root.Record("root 1")
// Hackily fix the timing on the first log message, so that we can check it later.
root.i.crdb.mu.recording.recordedLogs[0].Timestamp = root.i.crdb.startTime.Add(time.Millisecond)
{
// Hackily fix the timing on the first log message, so that we can check it later.
r := root.i.crdb.mu.recording.logs.GetFirst().(*tracingpb.LogRecord)
r.Time = root.i.crdb.startTime.Add(time.Millisecond)
root.i.crdb.mu.recording.logs.RemoveFirst()
root.i.crdb.mu.recording.logs.AddFirst(r)
}

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting will be screwed up.
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -214,36 +221,159 @@ func TestSpanRecordStructured(t *testing.T) {
`))
}

// TestSpanRecordStructuredLimit tests recording behavior when the size of
// structured data recorded into the span exceeds the configured limit.
func TestSpanRecordStructuredLimit(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()

offset := 1000 // we start at a high enough integer to not have to worry about variable payload sizes
payload := func(i int) Structured { return &types.Int32Value{Value: int32(i)} }
pad := func(i int) string { return fmt.Sprintf("%06d", i) }
payload := func(i int) Structured { return &types.StringValue{Value: pad(i)} }

numPayloads := maxStructuredBytesPerSpan / payload(offset).Size()
numPayloads := maxRecordedBytesPerSpan / payload(42).Size()
const extra = 10
for i := offset + 1; i <= offset+numPayloads+extra; i++ {
for i := 1; i <= numPayloads+extra; i++ {
sp.RecordStructured(payload(i))
}

sp.SetVerbose(true)
rec := sp.GetRecording()
require.Len(t, rec, 1)
require.Len(t, rec[0].InternalStructured, numPayloads)
require.Equal(t, "1", rec[0].Tags["_dropped"])

first := rec[0].InternalStructured[0]
last := rec[0].InternalStructured[len(rec[0].InternalStructured)-1]
var d1 types.DynamicAny
require.NoError(t, types.UnmarshalAny(first, &d1))
require.IsType(t, (*types.Int32Value)(nil), d1.Message)
require.IsType(t, (*types.StringValue)(nil), d1.Message)

var res string
require.NoError(t, types.StdStringUnmarshal(&res, first.Value))
require.Equal(t, pad(extra+1), res)

require.NoError(t, types.StdStringUnmarshal(&res, last.Value))
require.Equal(t, pad(numPayloads+extra), res)
}

// TestSpanRecordLimit tests recording behavior when the amount of data logged
// into the span exceeds the configured limit.
func TestSpanRecordLimit(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()
sp.SetVerbose(true)

msg := func(i int) string { return fmt.Sprintf("%06d", i) }

// Determine the size of a log record by actually recording once.
sp.Record(msg(42))
logSize := sp.GetRecording()[0].Logs[0].Size()
sp.ResetRecording()

numLogs := maxRecordedBytesPerSpan / logSize
const extra = 10
for i := 1; i <= numLogs+extra; i++ {
sp.Record(msg(i))
}

rec := sp.GetRecording()
require.Len(t, rec, 1)
require.Len(t, rec[0].Logs, numLogs)
require.Equal(t, rec[0].Tags["_dropped"], "1")

first := rec[0].Logs[0]
last := rec[0].Logs[len(rec[0].Logs)-1]

require.Equal(t, first.Fields[0].Value, msg(extra+1))
require.Equal(t, last.Fields[0].Value, msg(numLogs+extra))
}

// testStructuredImpl is a testing implementation of Structured event.
type testStructuredImpl struct {
*types.Int32Value
}

var _ Structured = &testStructuredImpl{}

func (t *testStructuredImpl) String() string {
return fmt.Sprintf("structured=%d", t.Value)
}

func newTestStructured(i int) *testStructuredImpl {
return &testStructuredImpl{
&types.Int32Value{Value: int32(i)},
}
}

var res int32
require.NoError(t, types.StdInt32Unmarshal(&res, first.Value))
require.Equal(t, res, int32(offset+numPayloads+extra))
// TestSpanReset checks that resetting a span clears out existing recordings.
func TestSpanReset(t *testing.T) {
tr := NewTracer()
sp := tr.StartSpan("root", WithForceRealSpan())
defer sp.Finish()
sp.SetVerbose(true)

for i := 1; i <= 10; i++ {
if i%2 == 0 {
sp.RecordStructured(newTestStructured(i))
} else {
sp.Record(fmt.Sprintf("%d", i))
}
}

require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), `
span: root
tags: _unfinished=1 _verbose=1
event: 1
event: structured=2
event: 3
event: structured=4
event: 5
event: structured=6
event: 7
event: structured=8
event: 9
event: structured=10
`))
require.NoError(t, TestingCheckRecording(sp.GetRecording(), `
=== operation:root _unfinished:1 _verbose:1
event:1
event:structured=2
event:3
event:structured=4
event:5
event:structured=6
event:7
event:structured=8
event:9
event:structured=10
`))

sp.ResetRecording()

require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), `
span: root
tags: _unfinished=1 _verbose=1
`))
require.NoError(t, TestingCheckRecording(sp.GetRecording(), `
=== operation:root _unfinished:1 _verbose:1
`))

msg := func(i int) string { return fmt.Sprintf("%06d", i) }
sp.Record(msg(0))
logSize := sp.GetRecording()[0].Logs[0].Size()
numLogs := maxRecordedBytesPerSpan / logSize
const extra = 10

for i := 1; i <= numLogs+extra; i++ {
sp.Record(msg(i))
}

require.NoError(t, types.StdInt32Unmarshal(&res, last.Value))
require.Equal(t, res, int32(offset+extra+1))
require.Equal(t, sp.GetRecording()[0].Tags["_dropped"], "1")
sp.ResetRecording()
_, found := sp.GetRecording()[0].Tags["_dropped"]
require.False(t, found)
}

func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) {
Expand Down
Loading

0 comments on commit c86ce89

Please sign in to comment.