From cd3b0ec0e7269636ace41d3d0d331b1d6108f0ab Mon Sep 17 00:00:00 2001 From: Keiko Oda Date: Fri, 17 Nov 2023 16:22:21 +0900 Subject: [PATCH 1/4] Support pganalyze tracestate to set start time of the span --- logs/querysample/tracing.go | 41 +++++++++++++-- logs/querysample/tracing_test.go | 86 ++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 logs/querysample/tracing_test.go diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index 00374f9e7..cbce3f777 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "encoding/hex" "fmt" + "strconv" + "strings" "time" "github.com/pganalyze/collector/state" @@ -32,6 +34,41 @@ func urlToSample(server *state.Server, grant state.GrantLogs, sample state.Postg ) } +func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySample) (startTime time.Time, endTime time.Time) { + if pganalyzeState := traceState.Get("pganalyze"); pganalyzeState != "" { + // A pganalyze traceState allows the client to pass the query start time (sent time) + // on the client side, in nano second precision, like pganalyze=t:1697666938.6297212 + // If there are multiple values in a pganalzye traceState, they are separated by semicolon + // like pganalyze=t:1697666938.6297212;x=123 + for _, part := range strings.Split(strings.TrimSpace(pganalyzeState), ";") { + if strings.Contains(part, ":") { + keyAndValue := strings.SplitN(part, ":", 2) + if strings.TrimSpace(keyAndValue[0]) == "t" { + if startInSec, err := strconv.ParseFloat(keyAndValue[1], 64); err == nil { + startSec := int64(startInSec) + startNanoSec := int64(startInSec*1e9) - (startSec * 1e9) + startTime = time.Unix(startSec, startNanoSec).UTC() + // With this, we're adding the query duration to the start time. + // This could result creating inaccurate spans, as the start time passed + // from the client side using tracestate is the time of the query is sent + // from the client to the server. + // This means, we will ignore the network time between the client and the + // server, as well as the machine clock different between them. + endTime = startTime.Add(time.Duration(sample.RuntimeMs) * time.Millisecond) + return + } + } + } + } + } + // Calculate start and end time based on sample data + duration := time.Duration(sample.RuntimeMs) * time.Millisecond + startTime = sample.OccurredAt.Add(-1 * duration) + endTime = sample.OccurredAt + + return +} + func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, logger *util.Logger, grant state.GrantLogs, samples []state.PostgresQuerySample) { exportCount := 0 for _, sample := range samples { @@ -49,9 +86,7 @@ func ExportQuerySamplesAsTraceSpans(ctx context.Context, server *state.Server, l trace.WithInstrumentationVersion(util.CollectorVersion), trace.WithSchemaURL(semconv.SchemaURL), ) - duration := -1 * time.Duration(sample.RuntimeMs) * time.Millisecond - startTime := sample.OccurredAt.Add(duration) - endTime := sample.OccurredAt + startTime, endTime := startAndEndTime(trace.SpanContextFromContext(ctx).TraceState(), sample) _, span := tracer.Start(ctx, otelSpanName, trace.WithTimestamp(startTime)) // See https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/database/ // however note that "db.postgresql.plan" is non-standard. diff --git a/logs/querysample/tracing_test.go b/logs/querysample/tracing_test.go new file mode 100644 index 000000000..0f081a005 --- /dev/null +++ b/logs/querysample/tracing_test.go @@ -0,0 +1,86 @@ +package querysample + +import ( + "testing" + "time" + + "github.com/pganalyze/collector/state" + "go.opentelemetry.io/otel/trace" +) + +type startAndEndTimeTestPair struct { + testName string + traceState trace.TraceState + sample state.PostgresQuerySample + startTime time.Time + endTime time.Time +} + +func TestStartAndEndTime(t *testing.T) { + currentTime, err := time.Parse("2006-01-02", "2023-01-01") + if err != nil { + t.Fatalf("Failed to initialize object: %v", err) + } + traceState := trace.TraceState{} + otelTraceState, err := traceState.Insert("ot", "p:8;r:62") + if err != nil { + t.Fatalf("Failed to initialize object: %v", err) + } + pganalyzeTraceStateWithoutT, err := traceState.Insert("pganalyze", "x:foo;y:bar") + if err != nil { + t.Fatalf("Failed to initialize object: %v", err) + } + // inserting the same key will update the value + pganalyzeTraceState, err := traceState.Insert("pganalyze", "t:1697666938.6297212") + if err != nil { + t.Fatalf("Failed to initialize object: %v", err) + } + // 1697666938.6297212 = 2023-10-18 22:08:58.6297212 + pganalyzeTime, err := time.Parse("2006-01-02T15:04:05.999999999", "2023-10-18T22:08:58.6297212") + if err != nil { + t.Fatalf("Failed to initialize object: %v", err) + } + // due to the limitation of the floating point, the result won't exactly like above, so tweaking to pass the test + pganalyzeTime = pganalyzeTime.Add(-1 * 112) + + var startAndEndTimeTests = []startAndEndTimeTestPair{ + { + "No trace state", + trace.TraceState{}, + state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime}, + currentTime.Add(-1 * 1000 * time.Millisecond), + currentTime, + }, + { + "No pganalyze trace state", + otelTraceState, + state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime}, + currentTime.Add(-1 * 1000 * time.Millisecond), + currentTime, + }, + { + "pganalyze trace state without t", + pganalyzeTraceStateWithoutT, + state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime}, + currentTime.Add(-1 * 1000 * time.Millisecond), + currentTime, + }, + { + "pganalyze trace state", + pganalyzeTraceState, + state.PostgresQuerySample{RuntimeMs: 1000, OccurredAt: currentTime}, + pganalyzeTime, + pganalyzeTime.Add(1000 * time.Millisecond), + }, + } + + for _, pair := range startAndEndTimeTests { + startTime, endTime := startAndEndTime(pair.traceState, pair.sample) + if pair.startTime != startTime { + t.Errorf("For %s: expected startTime to be %v, but was %v\n", pair.testName, pair.startTime, startTime) + } + if pair.endTime != endTime { + t.Errorf("For %s: expected endTime to be %v, but was %v\n", pair.testName, pair.endTime, endTime) + } + } +} From e3f9fd8671c39f5a6c0039d67b72d0d96da64dc7 Mon Sep 17 00:00:00 2001 From: Keiko Oda Date: Fri, 17 Nov 2023 16:59:22 +0900 Subject: [PATCH 2/4] Use time.Date to create a test time --- logs/querysample/tracing_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/logs/querysample/tracing_test.go b/logs/querysample/tracing_test.go index 0f081a005..cb4d5d8b4 100644 --- a/logs/querysample/tracing_test.go +++ b/logs/querysample/tracing_test.go @@ -17,10 +17,7 @@ type startAndEndTimeTestPair struct { } func TestStartAndEndTime(t *testing.T) { - currentTime, err := time.Parse("2006-01-02", "2023-01-01") - if err != nil { - t.Fatalf("Failed to initialize object: %v", err) - } + currentTime := time.Date(2023, time.January, 1, 1, 2, 3, 456*1000*1000, time.UTC) traceState := trace.TraceState{} otelTraceState, err := traceState.Insert("ot", "p:8;r:62") if err != nil { From b574d8f9e0990c03f5b3b6f9272154f1704353e3 Mon Sep 17 00:00:00 2001 From: Keiko Oda Date: Tue, 21 Nov 2023 15:34:07 +0900 Subject: [PATCH 3/4] Create util.TimeFromStr to convert string to time --- logs/querysample/tracing.go | 9 ++--- logs/querysample/tracing_test.go | 7 +--- util/tracing.go | 45 +++++++++++++++++++++++ util/tracing_test.go | 63 ++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 11 deletions(-) create mode 100644 util/tracing.go create mode 100644 util/tracing_test.go diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index cbce3f777..36b3ab9cd 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "strconv" "strings" "time" @@ -44,10 +43,8 @@ func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySamp if strings.Contains(part, ":") { keyAndValue := strings.SplitN(part, ":", 2) if strings.TrimSpace(keyAndValue[0]) == "t" { - if startInSec, err := strconv.ParseFloat(keyAndValue[1], 64); err == nil { - startSec := int64(startInSec) - startNanoSec := int64(startInSec*1e9) - (startSec * 1e9) - startTime = time.Unix(startSec, startNanoSec).UTC() + if start, err := util.TimeFromStr(keyAndValue[1]); err == nil { + startTime = start // With this, we're adding the query duration to the start time. // This could result creating inaccurate spans, as the start time passed // from the client side using tracestate is the time of the query is sent @@ -61,7 +58,7 @@ func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySamp } } } - // Calculate start and end time based on sample data + // If no start time was found in the tracestate, calculate start and end time based on sample data duration := time.Duration(sample.RuntimeMs) * time.Millisecond startTime = sample.OccurredAt.Add(-1 * duration) endTime = sample.OccurredAt diff --git a/logs/querysample/tracing_test.go b/logs/querysample/tracing_test.go index cb4d5d8b4..90b320ac6 100644 --- a/logs/querysample/tracing_test.go +++ b/logs/querysample/tracing_test.go @@ -23,12 +23,11 @@ func TestStartAndEndTime(t *testing.T) { if err != nil { t.Fatalf("Failed to initialize object: %v", err) } - pganalyzeTraceStateWithoutT, err := traceState.Insert("pganalyze", "x:foo;y:bar") + pganalyzeTraceStateWithoutT, err := otelTraceState.Insert("pganalyze", "x:foo;y:bar") if err != nil { t.Fatalf("Failed to initialize object: %v", err) } - // inserting the same key will update the value - pganalyzeTraceState, err := traceState.Insert("pganalyze", "t:1697666938.6297212") + pganalyzeTraceState, err := otelTraceState.Insert("pganalyze", "t:1697666938.6297212") if err != nil { t.Fatalf("Failed to initialize object: %v", err) } @@ -37,8 +36,6 @@ func TestStartAndEndTime(t *testing.T) { if err != nil { t.Fatalf("Failed to initialize object: %v", err) } - // due to the limitation of the floating point, the result won't exactly like above, so tweaking to pass the test - pganalyzeTime = pganalyzeTime.Add(-1 * 112) var startAndEndTimeTests = []startAndEndTimeTestPair{ { diff --git a/util/tracing.go b/util/tracing.go new file mode 100644 index 000000000..0e46eb077 --- /dev/null +++ b/util/tracing.go @@ -0,0 +1,45 @@ +package util + +import ( + "errors" + "strconv" + "strings" + "time" +) + +// TimeFromStr returns a time of the given value in string. +// The value is the value of time either as a floating point number of seconds since the Epoch, or it can also be a integer number of seconds. +func TimeFromStr(value string) (time.Time, error) { + IntegerAndDecimal := strings.SplitN(value, ".", 2) + secInStr := IntegerAndDecimal[0] + sec, err := strconv.ParseInt(secInStr, 10, 64) + if err != nil { + return time.Time{}, err + } + if len(IntegerAndDecimal) == 1 { + return time.Unix(sec, 0).UTC(), nil + } + decimalInStr := IntegerAndDecimal[1] + if decimalInStr == "" { + decimalInStr = "0" + } + if len(decimalInStr) > 9 { + // decimal length shouldn't be more than nanoseconds (9) + return time.Time{}, errors.New("decimal length is longer than nanoseconds (9)") + } + nsecInStr := rightPad(decimalInStr, 9, "0") + nsec, err := strconv.ParseInt(nsecInStr, 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(sec, nsec).UTC(), nil +} + +// rightPad returns the string that is right padded with the given pad string. +func rightPad(str string, length int, pad string) string { + if len(str) >= length { + return str + } + padding := strings.Repeat(pad, length-len(str)) + return str + padding +} diff --git a/util/tracing_test.go b/util/tracing_test.go new file mode 100644 index 000000000..1fbcff952 --- /dev/null +++ b/util/tracing_test.go @@ -0,0 +1,63 @@ +package util_test + +import ( + "reflect" + "testing" + "time" + + "github.com/pganalyze/collector/util" +) + +var timeFromStrTests = []struct { + input string + expected time.Time + expectErr bool +}{ + { + "1697666938.629721234", + time.Unix(1697666938, 629721234).UTC(), + false, + }, + { + "1697666938.629", + time.Unix(1697666938, 629000000).UTC(), + false, + }, + { + "1697666938", + time.Unix(1697666938, 0).UTC(), + false, + }, + { + "", + time.Time{}, + true, + }, + { + "not a time", + time.Time{}, + true, + }, + { + "1697666938.baddecimal", + time.Time{}, + true, + }, + { + "1697666938.6297212340000", // nsec too long + time.Time{}, + true, + }, +} + +func TestTimeFromStr(t *testing.T) { + for _, test := range timeFromStrTests { + actual, err := util.TimeFromStr(test.input) + if (err != nil) != test.expectErr { + t.Errorf("TimeFromStr(%s): expected err: %t; actual: %s", test.input, test.expectErr, err) + } + if !reflect.DeepEqual(actual, test.expected) { + t.Errorf("TimeFromStr(%s): expected %v; actual %v", test.input, test.expected, actual) + } + } +} From 2c76f7a0d425708c846feadc6615b3b354868a1e Mon Sep 17 00:00:00 2001 From: Keiko Oda Date: Tue, 21 Nov 2023 17:17:21 +0900 Subject: [PATCH 4/4] Fix typo --- logs/querysample/tracing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logs/querysample/tracing.go b/logs/querysample/tracing.go index 36b3ab9cd..794568f60 100644 --- a/logs/querysample/tracing.go +++ b/logs/querysample/tracing.go @@ -50,7 +50,7 @@ func startAndEndTime(traceState trace.TraceState, sample state.PostgresQuerySamp // from the client side using tracestate is the time of the query is sent // from the client to the server. // This means, we will ignore the network time between the client and the - // server, as well as the machine clock different between them. + // server, as well as the machine clock difference between them. endTime = startTime.Add(time.Duration(sample.RuntimeMs) * time.Millisecond) return }