Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pganalyze tracestate to set start time of the span #475

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions logs/querysample/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"strings"
"time"

"github.com/pganalyze/collector/state"
Expand Down Expand Up @@ -32,6 +33,39 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg
)
}

func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySample) (startTime time.Time, endTime time.Time) {
if pganalyzeState := traceState.Get("pganalyze"); pganalyzeState != "" {
// A pganalyze traceState allows the client to pass the query start time (sent time)
// on the client side, in nano second precision, like pganalyze=t:1697666938.6297212
// If there are multiple values in a pganalzye traceState, they are separated by semicolon
// like pganalyze=t:1697666938.6297212;x=123
for _, part := range strings.Split(strings.TrimSpace(pganalyzeState), ";") {
if strings.Contains(part, ":") {
keyAndValue := strings.SplitN(part, ":", 2)
if strings.TrimSpace(keyAndValue[0]) == "t" {
if start, err := util.TimeFromStr(keyAndValue[1]); err == nil {
startTime = start
// With this, we're adding the query duration to the start time.
// This could result creating inaccurate spans, as the start time passed
// from the client side using tracestate is the time of the query is sent
// from the client to the server.
// This means, we will ignore the network time between the client and the
// server, as well as the machine clock difference between them.
endTime = startTime.Add(time.Duration(sample.RuntimeMs) * time.Millisecond)
return
}
}
}
}
}
// If no start time was found in the tracestate, calculate start and end time based on sample data
duration := time.Duration(sample.RuntimeMs) * time.Millisecond
startTime = sample.OccurredAt.Add(-1 * duration)
keiko713 marked this conversation as resolved.
Show resolved Hide resolved
endTime = sample.OccurredAt

return
}

func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) {
exportCount := 0
for _, sample := range samples {
Expand All @@ -49,9 +83,7 @@ func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l
trace.WithInstrumentationVersion(util.CollectorVersion),
trace.WithSchemaURL(semconv.SchemaURL),
)
duration := -1 * time.Duration(sample.RuntimeMs) * time.Millisecond
startTime := sample.OccurredAt.Add(duration)
endTime := sample.OccurredAt
startTime, endTime := startAndEndTime(trace.SpanContextFromContext(ctx).TraceState(), sample)
_, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime))
// See https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/
// however note that "db.postgresql.plan" is non-standard.
Expand Down
80 changes: 80 additions & 0 deletions logs/querysample/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package querysample

import (
"testing"
"time"

"github.com/pganalyze/collector/state"
"go.opentelemetry.io/otel/trace"
)

type startAndEndTimeTestPair struct {
testName string
traceState trace.TraceState
sample state.PostgresQuerySample
startTime time.Time
endTime time.Time
}

func TestStartAndEndTime(t *testing.T) {
currentTime := time.Date(2023, time.January, 1, 1, 2, 3, 456*1000*1000, time.UTC)
traceState := trace.TraceState{}
otelTraceState, err := traceState.Insert("ot", "p:8;r:62")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
pganalyzeTraceStateWithoutT, err := otelTraceState.Insert("pganalyze", "x:foo;y:bar")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
pganalyzeTraceState, err := otelTraceState.Insert("pganalyze", "t:1697666938.6297212")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}
// 1697666938.6297212 = 2023-10-18 22:08:58.6297212
pganalyzeTime, err := time.Parse("2006-01-02T15:04:05.999999999", "2023-10-18T22:08:58.6297212")
if err != nil {
t.Fatalf("Failed to initialize object: %v", err)
}

var startAndEndTimeTests = []startAndEndTimeTestPair{
{
"No trace state",
trace.TraceState{},
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"No pganalyze trace state",
otelTraceState,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"pganalyze trace state without t",
pganalyzeTraceStateWithoutT,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
currentTime.Add(-1 * 1000 * time.Millisecond),
currentTime,
},
{
"pganalyze trace state",
pganalyzeTraceState,
state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime},
pganalyzeTime,
pganalyzeTime.Add(1000 * time.Millisecond),
},
}

for _, pair := range startAndEndTimeTests {
startTime, endTime := startAndEndTime(pair.traceState, pair.sample)
if pair.startTime != startTime {
t.Errorf("For %s: expected startTime to be %v, but was %v\n", pair.testName, pair.startTime, startTime)
}
if pair.endTime != endTime {
t.Errorf("For %s: expected endTime to be %v, but was %v\n", pair.testName, pair.endTime, endTime)
}
}
}
45 changes: 45 additions & 0 deletions util/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package util

import (
"errors"
"strconv"
"strings"
"time"
)

// TimeFromStr returns a time of the given value in string.
// The value is the value of time either as a floating point number of seconds since the Epoch, or it can also be a integer number of seconds.
func TimeFromStr(value string) (time.Time, error) {
IntegerAndDecimal := strings.SplitN(value, ".", 2)
secInStr := IntegerAndDecimal[0]
sec, err := strconv.ParseInt(secInStr, 10, 64)
if err != nil {
return time.Time{}, err
}
if len(IntegerAndDecimal) == 1 {
return time.Unix(sec, 0).UTC(), nil
}
decimalInStr := IntegerAndDecimal[1]
if decimalInStr == "" {
decimalInStr = "0"
}
if len(decimalInStr) > 9 {
// decimal length shouldn't be more than nanoseconds (9)
return time.Time{}, errors.New("decimal length is longer than nanoseconds (9)")
}
nsecInStr := rightPad(decimalInStr, 9, "0")
nsec, err := strconv.ParseInt(nsecInStr, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec).UTC(), nil
}

// rightPad returns the string that is right padded with the given pad string.
func rightPad(str string, length int, pad string) string {
if len(str) >= length {
return str
}
padding := strings.Repeat(pad, length-len(str))
return str + padding
}
63 changes: 63 additions & 0 deletions util/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util_test

import (
"reflect"
"testing"
"time"

"github.com/pganalyze/collector/util"
)

var timeFromStrTests = []struct {
input string
expected time.Time
expectErr bool
}{
{
"1697666938.629721234",
time.Unix(1697666938, 629721234).UTC(),
false,
},
{
"1697666938.629",
time.Unix(1697666938, 629000000).UTC(),
false,
},
{
"1697666938",
time.Unix(1697666938, 0).UTC(),
false,
},
{
"",
time.Time{},
true,
},
{
"not a time",
time.Time{},
true,
},
{
"1697666938.baddecimal",
time.Time{},
true,
},
{
"1697666938.6297212340000", // nsec too long
time.Time{},
true,
},
}

func TestTimeFromStr(t *testing.T) {
for _, test := range timeFromStrTests {
actual, err := util.TimeFromStr(test.input)
if (err != nil) != test.expectErr {
t.Errorf("TimeFromStr(%s): expected err: %t; actual: %s", test.input, test.expectErr, err)
}
if !reflect.DeepEqual(actual, test.expected) {
t.Errorf("TimeFromStr(%s): expected %v; actual %v", test.input, test.expected, actual)
}
}
}