From dc0598e31d992668e4eaa1579e23207c4094b721 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 10 May 2024 11:31:17 +0200 Subject: [PATCH 1/3] chore(logging): Add entry's timestamp when rejected with `too far behind` Ingester uses `max_sample_age/2` to reject out-of-order ingestion in Loki. More details about how it works are here and here. But when it is rejected, currently we print log line like this on ingester. ``` entry too far behind, oldest acceptable timestamp is: 2024-05-07T10:25:30Z ``` Problem is it doesn't print the original timestamp of the log entry itself. So hard to interpret sometime. This PR fixes it. Now it looks something like this. ``` entry too far behind, entry timestamp is: 2024-05-05T10:25:30Z, oldest acceptable timestamp is: 2024-05-07T10:25:30Z ``` Also, another thing, now it is consistent with how error we return to client when we reject it. ``` entry with timestamp 2024-05-06 15:58:50 +0000 UTC ignored, reason: 'entry too far behind, oldest acceptable timestamp is: 2024-05-07T10:25:30Z', user 'fake', total ignored: 1 out of 2 for stream: {app="webserver1", host="localhost", service_name="webserver1"} ``` Signed-off-by: Kaviraj --- pkg/chunkenc/interface.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index b96d9f705d092..8d6f5e1e8dd60 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -24,6 +24,10 @@ var ( ) type errTooFarBehind struct { + // original timestmap of the entry itself. + entryTs time.Time + + // cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to. cutoff time.Time } @@ -32,12 +36,12 @@ func IsErrTooFarBehind(err error) bool { return ok } -func ErrTooFarBehind(cutoff time.Time) error { - return &errTooFarBehind{cutoff: cutoff} +func ErrTooFarBehind(entryTs, cutoff time.Time) error { + return &errTooFarBehind{entryTs: entryTs, cutoff: cutoff} } func (m *errTooFarBehind) Error() string { - return "entry too far behind, oldest acceptable timestamp is: " + m.cutoff.Format(time.RFC3339) + return fmt.Sprintf("entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s", m.entryTs.Format(time.RFC3339), m.cutoff.Format(time.RFC3339)) } func IsOutOfOrderErr(err error) bool { From ff0dee9687368ccbcd0f553f0b3fdc0b0a3b90f4 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 10 May 2024 11:44:54 +0200 Subject: [PATCH 2/3] use it on stream.go Signed-off-by: Kaviraj --- pkg/ingester/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index d7a29b73e802d..6bf75dfa1ac54 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -394,7 +394,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2) if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { - failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)}) + failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)}) s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels)) outOfOrderSamples++ outOfOrderBytes += lineBytes From 147a30ba1bbe030c94443331cb74d015f44c9b1c Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 10 May 2024 14:10:29 +0200 Subject: [PATCH 3/3] update tests Signed-off-by: Kaviraj --- pkg/chunkenc/interface_test.go | 4 +++- pkg/ingester/stream_test.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/chunkenc/interface_test.go b/pkg/chunkenc/interface_test.go index daea36cb38e72..ed81c4d3604e4 100644 --- a/pkg/chunkenc/interface_test.go +++ b/pkg/chunkenc/interface_test.go @@ -31,7 +31,9 @@ func TestParseEncoding(t *testing.T) { } func TestIsOutOfOrderErr(t *testing.T) { - for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(time.Now())} { + now := time.Now() + + for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(now, now)} { require.Equal(t, true, IsOutOfOrderErr(err)) } } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 26eef4e3a7936..af877bf88da9e 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -84,8 +84,9 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { var expected bytes.Buffer for i := 0; i < tc.expectErrs; i++ { fmt.Fprintf(&expected, - "entry with timestamp %s ignored, reason: 'entry too far behind, oldest acceptable timestamp is: %s',\n", + "entry with timestamp %s ignored, reason: 'entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s',\n", time.Unix(int64(i), 0).String(), + newLines[i].Timestamp.Format(time.RFC3339), time.Unix(int64(numLogs), 0).Format(time.RFC3339), ) }