From 774f2bd565a3299aadba1ab0047eb0f8bcf4e428 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Wed, 15 May 2024 16:02:11 -0400 Subject: [PATCH 01/15] First pass at getting duplicate log line metrics to be outputted via a runtime config --- pkg/chunkenc/interface.go | 26 +++++++ pkg/chunkenc/interface_test.go | 10 +++ pkg/chunkenc/unordered.go | 2 +- pkg/chunkenc/unordered_test.go | 16 ++++- pkg/distributor/writefailures/manager.go | 3 +- pkg/ingester/instance.go | 4 +- pkg/ingester/instance_test.go | 4 +- pkg/ingester/stream.go | 40 +++++++++-- pkg/ingester/stream_test.go | 87 +++++++++++++++++++++++- pkg/ingester/streams_map_test.go | 2 + pkg/loki/runtime_config_test.go | 10 +++ pkg/runtime/config.go | 18 ++++- pkg/validation/validate.go | 11 +++ 13 files changed, 214 insertions(+), 19 deletions(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 3825a6520af5f..d28c4f94e9a8b 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -21,6 +21,7 @@ var ( ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid chunk checksum") + ErrDuplicateEntry = errors.New("duplicate entry") ) type errTooFarBehind struct { @@ -48,6 +49,31 @@ func IsOutOfOrderErr(err error) bool { return err == ErrOutOfOrder || IsErrTooFarBehind(err) } +type errDuplicateEntry struct { + // original timestamp of the entry itself. + entryTs time.Time + + // string representation of the stream for the entry + stream string +} + +func IsErrDuplicateEntry(err error) bool { + _, ok := err.(*errDuplicateEntry) + return ok +} + +func ErrDuplicateLogEntry(entryTs time.Time, stream string) error { + return &errDuplicateEntry{entryTs: entryTs, stream: stream} +} + +func (m *errDuplicateEntry) Error() string { + return fmt.Sprintf("entry is a duplicate, entry timestamp is: %s, stream information is: %s", m.entryTs.Format(time.RFC3339), m.stream) +} + +func IsDuplicateEntryErr(err error) bool { + return err == ErrDuplicateEntry || IsErrDuplicateEntry(err) +} + // Encoding is the identifier for a chunk encoding. type Encoding byte diff --git a/pkg/chunkenc/interface_test.go b/pkg/chunkenc/interface_test.go index ed81c4d3604e4..1879133006ed7 100644 --- a/pkg/chunkenc/interface_test.go +++ b/pkg/chunkenc/interface_test.go @@ -37,3 +37,13 @@ func TestIsOutOfOrderErr(t *testing.T) { require.Equal(t, true, IsOutOfOrderErr(err)) } } + +func TestIsDuplicateEntryErr(t *testing.T) { + if !IsDuplicateEntryErr(ErrDuplicateEntry) { + t.Errorf("IsDuplicateEntryErr() = false, want true for ErrDuplicateEntry") + } + + if IsDuplicateEntryErr(ErrOutOfOrder) { + t.Errorf("IsDuplicateEntryErr() = true, want false for errors other than ErrDuplicateEntry") + } +} diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 788f9c0a7c45b..15b3644e6b395 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -135,7 +135,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - return nil + return ErrDuplicateEntry } } e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)}) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index f4930952660fc..4000b0e8de3e3 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -86,6 +86,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { desc string input, exp []entry dir logproto.Direction + hasDup bool }{ { desc: "simple forward", @@ -152,7 +153,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, }, - dir: logproto.FORWARD, + dir: logproto.FORWARD, + hasDup: true, }, { desc: "ts remove exact dupe backward", @@ -162,7 +164,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {1, "c", nil}, {0, "b", nil}, {0, "a", nil}, }, - dir: logproto.BACKWARD, + dir: logproto.BACKWARD, + hasDup: true, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -173,7 +176,14 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) + err := hb.Append(e.t, e.s, e.structuredMetadata) + if tc.hasDup { + if err != nil && err != ErrDuplicateEntry { + require.Equal(t, err, ErrDuplicateEntry) + } + } else { + require.Nil(t, err) + } } itr := hb.Iterator( diff --git a/pkg/distributor/writefailures/manager.go b/pkg/distributor/writefailures/manager.go index f02ab2e57d76f..9c6e2818dca9d 100644 --- a/pkg/distributor/writefailures/manager.go +++ b/pkg/distributor/writefailures/manager.go @@ -39,7 +39,8 @@ func (m *Manager) Log(tenantID string, err error) { return } - if !m.tenantCfgs.LimitedLogPushErrors(tenantID) { + if !m.tenantCfgs.LimitedLogPushErrors(tenantID) && + !m.tenantCfgs.LogDuplicateStreamInfo(tenantID) { return } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index a4436b9d41915..961bcbd17db37 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -320,7 +320,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -358,7 +358,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) i.streamsCreatedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Inc() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7f7dc30361d6a..ed7dcc2cdadca 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -308,7 +308,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -567,7 +567,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) } }) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 0aa3c41ea619b..c255c22efbfc1 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/grafana/loki/v3/pkg/runtime" + "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/opentracing/opentracing-go" @@ -78,6 +80,8 @@ type stream struct { chunkFormat byte chunkHeadBlockFormat chunkenc.HeadBlockFmt + + configs *runtime.TenantConfigs } type chunkDesc struct { @@ -107,6 +111,7 @@ func newStream( streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager, + configs *runtime.TenantConfigs, ) *stream { hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ @@ -126,6 +131,8 @@ func newStream( writeFailures: writeFailures, chunkFormat: chunkFormat, chunkHeadBlockFormat: headBlockFmt, + + configs: configs, } } @@ -334,13 +341,23 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa chunk.lastUpdated = time.Now() if err := chunk.chunk.Append(&entries[i]); err != nil { - invalid = append(invalid, entryWithError{&entries[i], err}) - if chunkenc.IsOutOfOrderErr(err) { - s.writeFailures.Log(s.tenant, err) - outOfOrderSamples++ - outOfOrderBytes += len(entries[i].Line) + if chunkenc.IsDuplicateEntryErr(err) { + if s.configs.LogDuplicateMetrics(s.tenant) { + s.reportDuplicateMetrics(len(entries[i].Line)) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + err = chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) + s.writeFailures.Log(s.tenant, err) + } + } else { + invalid = append(invalid, entryWithError{&entries[i], err}) + if chunkenc.IsOutOfOrderErr(err) { + s.writeFailures.Log(s.tenant, err) + outOfOrderSamples++ + outOfOrderBytes += len(entries[i].Line) + } + continue } - continue } s.entryCt++ @@ -380,6 +397,13 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, // NOTE: it's still possible for duplicates to be appended if a stream is // deleted from inactivity. if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content { + if s.configs.LogDuplicateMetrics(s.tenant) { + s.reportDuplicateMetrics(len(entries[i].Line)) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + err := chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) + s.writeFailures.Log(s.tenant, err) + } continue } @@ -456,6 +480,10 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde } } +func (s *stream) reportDuplicateMetrics(duplicateLogLineBytes int) { + validation.DuplicateLogEntries.WithLabelValues(validation.DiscardedBytesTotal, s.tenant).Add(float64(duplicateLogLineBytes)) +} + func (s *stream) cutChunk(ctx context.Context) *chunkDesc { if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogKV("event", "stream started to cut chunk") diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index e4dd4a37ab355..9f44fbbd9f329 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -4,17 +4,23 @@ import ( "bytes" "context" "fmt" + gokitlog "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "math/rand" "net/http" "testing" "time" + "github.com/grafana/loki/v3/pkg/runtime" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -69,6 +75,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) _, err := s.Push(context.Background(), []logproto.Entry{ @@ -122,6 +129,7 @@ func TestPushDeduplication(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) written, err := s.Push(context.Background(), []logproto.Entry{ @@ -136,6 +144,69 @@ func TestPushDeduplication(t *testing.T) { require.Equal(t, len("test"+"newer, better test"), written) } +func TestPushDeduplicationExtraMetrics(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + chunkfmt, headfmt := defaultChunkFormat(t) + + buf := bytes.NewBuffer(nil) + logger := gokitlog.NewLogfmtLogger(buf) + + provider := &providerMock{ + tenantConfig: func(tenantID string) *runtime.Config { + if tenantID == "fake" { + return &runtime.Config{ + LogDuplicateMetrics: true, + LogDuplicateStreamInfo: true, + } + } + + return &runtime.Config{} + }, + } + + runtimeCfg, err := runtime.NewTenantConfigs(provider) + + manager := writefailures.NewManager(logger, prometheus.NewRegistry(), writefailures.Cfg{LogRate: flagext.ByteSize(1000), AddInsightsLabel: true}, runtimeCfg, "ingester") + + require.NoError(t, err) + + s := newStream( + chunkfmt, + headfmt, + defaultConfig(), + limiter, + "fake", + model.Fingerprint(0), + labels.Labels{ + {Name: "foo", Value: "bar"}, + }, + true, + NewStreamRateCalculator(), + NilMetrics, + manager, + runtimeCfg, + ) + + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 0, true, false) + require.NoError(t, err) + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "test"}, + }, recordPool.GetRecord(), 0, true, false) + require.Len(t, s.chunks, 1) + require.Equal(t, s.chunks[0].chunk.Size(), 1, "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + require.Equal(t, float64(4), testutil.ToFloat64(validation.DuplicateLogEntries.WithLabelValues(validation.DiscardedBytesTotal, "fake"))) + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, "insight") + require.Contains(t, content, "duplicate") +} + func TestPushRejectOldCounter(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -157,6 +228,7 @@ func TestPushRejectOldCounter(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) // counter should be 2 now since the first line will be deduped @@ -263,6 +335,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) s.highestTs = time.Now() @@ -301,6 +374,7 @@ func TestUnorderedPush(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) for _, x := range []struct { @@ -403,6 +477,7 @@ func TestPushRateLimit(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) entries := []logproto.Entry{ @@ -443,6 +518,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) entries := []logproto.Entry{ @@ -482,6 +558,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ) base := time.Now() @@ -532,7 +609,7 @@ func Benchmark_PushStream(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) chunkfmt, headfmt := defaultChunkFormat(b) - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil) + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, &fakeTailServer{}, 10) @@ -566,3 +643,11 @@ func defaultChunkFormat(t testing.TB) (byte, chunkenc.HeadBlockFmt) { return chunkfmt, headfmt } + +type providerMock struct { + tenantConfig func(string) *runtime.Config +} + +func (m *providerMock) TenantConfig(userID string) *runtime.Config { + return m.tenantConfig(userID) +} diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index d98369ff152a9..b14b3e07e497f 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -31,6 +31,7 @@ func TestStreamsMap(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ), newStream( chunkfmt, @@ -46,6 +47,7 @@ func TestStreamsMap(t *testing.T) { NewStreamRateCalculator(), NilMetrics, nil, + nil, ), } var s *stream diff --git a/pkg/loki/runtime_config_test.go b/pkg/loki/runtime_config_test.go index 36126841dc47b..81081a856ca2b 100644 --- a/pkg/loki/runtime_config_test.go +++ b/pkg/loki/runtime_config_test.go @@ -91,8 +91,12 @@ configs: "1": log_push_request: false limited_log_push_errors: false + log_duplicate_metrics: false + log_duplicate_stream_info: false "2": log_push_request: true + log_duplicate_metrics: true + log_duplicate_stream_info: true `) tenantConfigs, err := runtime.NewTenantConfigs(runtimeGetter) @@ -104,6 +108,12 @@ configs: require.Equal(t, true, tenantConfigs.LogPushRequest("2")) require.Equal(t, true, tenantConfigs.LimitedLogPushErrors("3")) require.Equal(t, false, tenantConfigs.LogPushRequest("3")) + require.Equal(t, false, tenantConfigs.LogDuplicateMetrics("1")) + require.Equal(t, true, tenantConfigs.LogDuplicateMetrics("2")) + require.Equal(t, false, tenantConfigs.LogDuplicateMetrics("3")) + require.Equal(t, false, tenantConfigs.LogDuplicateStreamInfo("1")) + require.Equal(t, true, tenantConfigs.LogDuplicateStreamInfo("2")) + require.Equal(t, false, tenantConfigs.LogDuplicateStreamInfo("3")) } func newTestRuntimeconfig(t *testing.T, yaml string) runtime.TenantConfigProvider { diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 85f8dc3d81b4a..1655789dae71d 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -5,9 +5,11 @@ import ( ) type Config struct { - LogStreamCreation bool `yaml:"log_stream_creation"` - LogPushRequest bool `yaml:"log_push_request"` - LogPushRequestStreams bool `yaml:"log_push_request_streams"` + LogStreamCreation bool `yaml:"log_stream_creation"` + LogPushRequest bool `yaml:"log_push_request"` + LogPushRequestStreams bool `yaml:"log_push_request_streams"` + LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"` + LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"` // LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace. LimitedLogPushErrors bool `yaml:"limited_log_push_errors"` @@ -18,6 +20,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LogStreamCreation, "operation-config.log-stream-creation", false, "Log every new stream created by a push request (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogPushRequest, "operation-config.log-push-request", false, "Log every push request (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogPushRequestStreams, "operation-config.log-push-request-streams", false, "Log every stream in a push request (very verbose, recommend to enable via runtime config only).") + f.BoolVar(&cfg.LogDuplicateMetrics, "operation-config.log-duplicate-metrics", false, "Log metrics for duplicate lines received.") + f.BoolVar(&cfg.LogDuplicateStreamInfo, "operation-config.log-duplicate-stream-info", false, "Log stream info for duplicate lines received") f.BoolVar(&cfg.LimitedLogPushErrors, "operation-config.limited-log-push-errors", true, "Log push errors with a rate limited logger, will show client push errors without overly spamming logs.") } @@ -94,6 +98,14 @@ func (o *TenantConfigs) LogPushRequestStreams(userID string) bool { return o.getOverridesForUser(userID).LogPushRequestStreams } +func (o *TenantConfigs) LogDuplicateMetrics(userID string) bool { + return o.getOverridesForUser(userID).LogDuplicateMetrics +} + +func (o *TenantConfigs) LogDuplicateStreamInfo(userID string) bool { + return o.getOverridesForUser(userID).LogDuplicateStreamInfo +} + func (o *TenantConfigs) LimitedLogPushErrors(userID string) bool { return o.getOverridesForUser(userID).LimitedLogPushErrors } diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 672676ea263a2..3508278c52c4e 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -67,6 +67,7 @@ const ( StructuredMetadataTooLargeErrorMsg = "stream '%s' has structured metadata too large: '%d' bytes, limit: '%d' bytes. Please see `limits_config.max_structured_metadata_size` or contact your Loki administrator to increase it." StructuredMetadataTooMany = "structured_metadata_too_many" StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." + DiscardedBytesTotal = "discarded_bytes_total" ) type ErrStreamRateLimit struct { @@ -128,3 +129,13 @@ var LineLengthHist = promauto.NewHistogram(prometheus.HistogramOpts{ Help: "The total number of bytes per line.", Buckets: prometheus.ExponentialBuckets(1, 8, 8), // 1B -> 16MB }) + +// DuplicateLogEntries is a metric of the total discarded duplicate bytes, by tenant. +var DuplicateLogEntries = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "duplicate_log_entries_total", + Help: "The total number of bytes that were discarded for duplicate log lines.", + }, + []string{ReasonLabel, "tenant"}, +) From 40063abe5a0ffbdf27db7c4c49c5f74c61f125ba Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 30 May 2024 14:46:08 -0400 Subject: [PATCH 02/15] WIP: PR comments: remove err, update metric name, put metric stuff in one spot --- docs/sources/shared/configuration.md | 8 ++++++++ pkg/chunkenc/interface.go | 26 -------------------------- pkg/chunkenc/interface_test.go | 10 ---------- pkg/chunkenc/unordered.go | 15 ++++++++++++++- pkg/chunkenc/unordered_test.go | 16 +++------------- pkg/ingester/stream.go | 28 +++++++++------------------- pkg/ingester/stream_test.go | 18 ++++++++++++------ pkg/validation/validate.go | 6 +++--- 8 files changed, 49 insertions(+), 78 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index aa045ce1f9776..5e52c95899449 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3745,6 +3745,14 @@ These are values which allow you to control aspects of Loki's operation, most co # CLI flag: -operation-config.log-push-request-streams [log_push_request_streams: | default = false] +# Log metrics for duplicate lines received. +# CLI flag: -operation-config.log-duplicate-metrics +[log_duplicate_metrics: | default = false] + +# Log stream info for duplicate lines received +# CLI flag: -operation-config.log-duplicate-stream-info +[log_duplicate_stream_info: | default = false] + # Log push errors with a rate limited logger, will show client push errors # without overly spamming logs. # CLI flag: -operation-config.limited-log-push-errors diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index d28c4f94e9a8b..3825a6520af5f 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -21,7 +21,6 @@ var ( ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid chunk checksum") - ErrDuplicateEntry = errors.New("duplicate entry") ) type errTooFarBehind struct { @@ -49,31 +48,6 @@ func IsOutOfOrderErr(err error) bool { return err == ErrOutOfOrder || IsErrTooFarBehind(err) } -type errDuplicateEntry struct { - // original timestamp of the entry itself. - entryTs time.Time - - // string representation of the stream for the entry - stream string -} - -func IsErrDuplicateEntry(err error) bool { - _, ok := err.(*errDuplicateEntry) - return ok -} - -func ErrDuplicateLogEntry(entryTs time.Time, stream string) error { - return &errDuplicateEntry{entryTs: entryTs, stream: stream} -} - -func (m *errDuplicateEntry) Error() string { - return fmt.Sprintf("entry is a duplicate, entry timestamp is: %s, stream information is: %s", m.entryTs.Format(time.RFC3339), m.stream) -} - -func IsDuplicateEntryErr(err error) bool { - return err == ErrDuplicateEntry || IsErrDuplicateEntry(err) -} - // Encoding is the identifier for a chunk encoding. type Encoding byte diff --git a/pkg/chunkenc/interface_test.go b/pkg/chunkenc/interface_test.go index 1879133006ed7..ed81c4d3604e4 100644 --- a/pkg/chunkenc/interface_test.go +++ b/pkg/chunkenc/interface_test.go @@ -37,13 +37,3 @@ func TestIsOutOfOrderErr(t *testing.T) { require.Equal(t, true, IsOutOfOrderErr(err)) } } - -func TestIsDuplicateEntryErr(t *testing.T) { - if !IsDuplicateEntryErr(ErrDuplicateEntry) { - t.Errorf("IsDuplicateEntryErr() = false, want true for ErrDuplicateEntry") - } - - if IsDuplicateEntryErr(ErrOutOfOrder) { - t.Errorf("IsDuplicateEntryErr() = true, want false for errors other than ErrDuplicateEntry") - } -} diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 15b3644e6b395..201971b3aed44 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -135,7 +135,20 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - return ErrDuplicateEntry + return nil + /* + TODO + + if s.configs.LogDuplicateMetrics(s.tenant) { + s.reportDuplicateMetrics(len(entries[i].Line)) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + err = chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) + s.writeFailures.Log(s.tenant, err) + } + return nil + + */ } } e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)}) diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 4000b0e8de3e3..f4930952660fc 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -86,7 +86,6 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { desc string input, exp []entry dir logproto.Direction - hasDup bool }{ { desc: "simple forward", @@ -153,8 +152,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, }, - dir: logproto.FORWARD, - hasDup: true, + dir: logproto.FORWARD, }, { desc: "ts remove exact dupe backward", @@ -164,8 +162,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {1, "c", nil}, {0, "b", nil}, {0, "a", nil}, }, - dir: logproto.BACKWARD, - hasDup: true, + dir: logproto.BACKWARD, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -176,14 +173,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { - err := hb.Append(e.t, e.s, e.structuredMetadata) - if tc.hasDup { - if err != nil && err != ErrDuplicateEntry { - require.Equal(t, err, ErrDuplicateEntry) - } - } else { - require.Nil(t, err) - } + require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) } itr := hb.Iterator( diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index c255c22efbfc1..9b8d82b7ba4a4 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -341,23 +341,13 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa chunk.lastUpdated = time.Now() if err := chunk.chunk.Append(&entries[i]); err != nil { - if chunkenc.IsDuplicateEntryErr(err) { - if s.configs.LogDuplicateMetrics(s.tenant) { - s.reportDuplicateMetrics(len(entries[i].Line)) - } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - err = chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) - s.writeFailures.Log(s.tenant, err) - } - } else { - invalid = append(invalid, entryWithError{&entries[i], err}) - if chunkenc.IsOutOfOrderErr(err) { - s.writeFailures.Log(s.tenant, err) - outOfOrderSamples++ - outOfOrderBytes += len(entries[i].Line) - } - continue + invalid = append(invalid, entryWithError{&entries[i], err}) + if chunkenc.IsOutOfOrderErr(err) { + s.writeFailures.Log(s.tenant, err) + outOfOrderSamples++ + outOfOrderBytes += len(entries[i].Line) } + continue } s.entryCt++ @@ -397,13 +387,13 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, // NOTE: it's still possible for duplicates to be appended if a stream is // deleted from inactivity. if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content { - if s.configs.LogDuplicateMetrics(s.tenant) { + /*if s.configs.LogDuplicateMetrics(s.tenant) { s.reportDuplicateMetrics(len(entries[i].Line)) } if s.configs.LogDuplicateStreamInfo(s.tenant) { err := chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) s.writeFailures.Log(s.tenant, err) - } + }*/ continue } @@ -481,7 +471,7 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde } func (s *stream) reportDuplicateMetrics(duplicateLogLineBytes int) { - validation.DuplicateLogEntries.WithLabelValues(validation.DiscardedBytesTotal, s.tenant).Add(float64(duplicateLogLineBytes)) + validation.DuplicateLogBytes.WithLabelValues(validation.DiscardedBytesTotal, s.tenant).Add(float64(duplicateLogLineBytes)) } func (s *stream) cutChunk(ctx context.Context) *chunkDesc { diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 9f44fbbd9f329..81a17264b13a3 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" - gokitlog "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "math/rand" "net/http" "testing" "time" + gokitlog "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/runtime" "github.com/prometheus/client_golang/prometheus/testutil" @@ -192,14 +193,19 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { _, err = s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, - }, recordPool.GetRecord(), 0, true, false) + }, recordPool.GetRecord(), 0, true, false, nil) + require.NoError(t, err) + _, err = s.Push(context.Background(), []logproto.Entry{ + {Timestamp: time.Unix(1, 0), Line: "not a test"}, + }, recordPool.GetRecord(), 0, true, false, nil) require.NoError(t, err) _, err = s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, - }, recordPool.GetRecord(), 0, true, false) + }, recordPool.GetRecord(), 0, true, false, nil) + require.NoError(t, err) require.Len(t, s.chunks, 1) - require.Equal(t, s.chunks[0].chunk.Size(), 1, "expected exact duplicate to be dropped and newer content with same timestamp to be appended") - require.Equal(t, float64(4), testutil.ToFloat64(validation.DuplicateLogEntries.WithLabelValues(validation.DiscardedBytesTotal, "fake"))) + require.Equal(t, 2, s.chunks[0].chunk.Size(), "expected exact duplicate to be dropped and newer content with same timestamp to be appended") + require.Equal(t, float64(4), testutil.ToFloat64(validation.DuplicateLogBytes.WithLabelValues(validation.DiscardedBytesTotal, "fake"))) content := buf.String() require.NotEmpty(t, content) diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 3508278c52c4e..e947efdd2897c 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -130,11 +130,11 @@ var LineLengthHist = promauto.NewHistogram(prometheus.HistogramOpts{ Buckets: prometheus.ExponentialBuckets(1, 8, 8), // 1B -> 16MB }) -// DuplicateLogEntries is a metric of the total discarded duplicate bytes, by tenant. -var DuplicateLogEntries = promauto.NewCounterVec( +// DuplicateLogBytes is a metric of the total discarded duplicate bytes, by tenant. +var DuplicateLogBytes = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: constants.Loki, - Name: "duplicate_log_entries_total", + Name: "duplicate_log_bytes_total", Help: "The total number of bytes that were discarded for duplicate log lines.", }, []string{ReasonLabel, "tenant"}, From 00b67f0f098124171fd1ca716e392bd50ae0e7fd Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 30 May 2024 16:29:11 -0400 Subject: [PATCH 03/15] Refactor so we can get tenant information and such in the unordered head block --- pkg/chunkenc/memchunk.go | 13 +++++++++- pkg/chunkenc/unordered.go | 49 ++++++++++++++++++++++++++++--------- pkg/ingester/stream.go | 6 +---- pkg/ingester/stream_test.go | 2 +- pkg/validation/validate.go | 11 --------- 5 files changed, 51 insertions(+), 30 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 107e3c71a97d5..999e5e0554b7d 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -17,10 +17,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/util/filter" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -196,6 +198,10 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { return nil } +func (hb *headBlock) AppendForTenant(ts int64, line string, _ labels.Labels, _ string, _ *runtime.TenantConfigs, _ string, _ *writefailures.Manager) error { + return hb.Append(ts, line, nil) +} + func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { @@ -835,6 +841,11 @@ func (c *MemChunk) Utilization() float64 { // Append implements Chunk. func (c *MemChunk) Append(entry *logproto.Entry) error { + return c.AppendForTenant(entry, "", nil, "", nil) +} + +// Append implements Chunk. +func (c *MemChunk) AppendForTenant(entry *logproto.Entry, tenant string, configs *runtime.TenantConfigs, labelsString string, manager *writefailures.Manager) error { entryTimestamp := entry.Timestamp.UnixNano() // If the head block is empty but there are cut blocks, we have to make @@ -846,7 +857,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { if c.format < ChunkFormatV4 { entry.StructuredMetadata = nil } - if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil { + if err := c.head.AppendForTenant(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata), tenant, configs, labelsString, manager); err != nil { return err } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 201971b3aed44..b48b9e093c54d 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -5,6 +5,11 @@ import ( "context" "encoding/binary" "fmt" + + "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "io" "math" "time" @@ -14,10 +19,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + "github.com/grafana/loki/v3/pkg/runtime" ) var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) @@ -35,6 +42,7 @@ type HeadBlock interface { UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) Append(int64, string, labels.Labels) error + AppendForTenant(int64, string, labels.Labels, string, *runtime.TenantConfigs, string, *writefailures.Manager) error Iterator( ctx context.Context, direction logproto.Direction, @@ -64,6 +72,16 @@ type unorderedHeadBlock struct { mint, maxt int64 // upper and lower bounds } +// DuplicateLogBytes is a metric of the total discarded duplicate bytes, by tenant. +var DuplicateLogBytes = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "duplicate_log_bytes_total", + Help: "The total number of bytes that were discarded for duplicate log lines.", + }, + []string{"reason", "tenant"}, +) + func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt, symbolizer *symbolizer) *unorderedHeadBlock { return &unorderedHeadBlock{ format: headBlockFmt, @@ -111,6 +129,10 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { } func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) error { + return hb.AppendForTenant(ts, line, structuredMetadata, "", nil, "", nil) +} + +func (hb *unorderedHeadBlock) AppendForTenant(ts int64, line string, structuredMetadata labels.Labels, tenant string, configs *runtime.TenantConfigs, labelsString string, manager *writefailures.Manager) error { if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt { // structuredMetadata must be ignored for the previous head block formats structuredMetadata = nil @@ -135,20 +157,17 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - return nil - /* - TODO - - if s.configs.LogDuplicateMetrics(s.tenant) { - s.reportDuplicateMetrics(len(entries[i].Line)) + if configs != nil { + if configs.LogDuplicateMetrics(tenant) { + reportDuplicateMetrics(len(line), tenant) } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - err = chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) - s.writeFailures.Log(s.tenant, err) + if configs.LogDuplicateStreamInfo(tenant) { + errMsg := fmt.Sprintf("duplicate log entry at timestamp %d for stream %s", ts, labelsString) + err := errors.New(errMsg) + manager.Log(tenant, err) } - return nil - - */ + } + return nil } } e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)}) @@ -172,6 +191,12 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l return nil } +var DiscardedBytesTotal = "discarded_bytes_total" + +func reportDuplicateMetrics(duplicateLogLineBytes int, tenant string) { + DuplicateLogBytes.WithLabelValues(DiscardedBytesTotal, tenant).Add(float64(duplicateLogLineBytes)) +} + func metaLabelsLen(metaLabels labels.Labels) int { length := 0 for _, label := range metaLabels { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 9b8d82b7ba4a4..c396e4426fc1a 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -340,7 +340,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } chunk.lastUpdated = time.Now() - if err := chunk.chunk.Append(&entries[i]); err != nil { + if err := chunk.chunk.AppendForTenant(&entries[i], s.tenant, s.configs, s.labelsString, s.writeFailures); err != nil { invalid = append(invalid, entryWithError{&entries[i], err}) if chunkenc.IsOutOfOrderErr(err) { s.writeFailures.Log(s.tenant, err) @@ -470,10 +470,6 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde } } -func (s *stream) reportDuplicateMetrics(duplicateLogLineBytes int) { - validation.DuplicateLogBytes.WithLabelValues(validation.DiscardedBytesTotal, s.tenant).Add(float64(duplicateLogLineBytes)) -} - func (s *stream) cutChunk(ctx context.Context) *chunkDesc { if sp := opentracing.SpanFromContext(ctx); sp != nil { sp.LogKV("event", "stream started to cut chunk") diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 81a17264b13a3..4505f5029824b 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -205,7 +205,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, 2, s.chunks[0].chunk.Size(), "expected exact duplicate to be dropped and newer content with same timestamp to be appended") - require.Equal(t, float64(4), testutil.ToFloat64(validation.DuplicateLogBytes.WithLabelValues(validation.DiscardedBytesTotal, "fake"))) + require.Equal(t, float64(4), testutil.ToFloat64(chunkenc.DuplicateLogBytes.WithLabelValues(chunkenc.DiscardedBytesTotal, "fake"))) content := buf.String() require.NotEmpty(t, content) diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index e947efdd2897c..672676ea263a2 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -67,7 +67,6 @@ const ( StructuredMetadataTooLargeErrorMsg = "stream '%s' has structured metadata too large: '%d' bytes, limit: '%d' bytes. Please see `limits_config.max_structured_metadata_size` or contact your Loki administrator to increase it." StructuredMetadataTooMany = "structured_metadata_too_many" StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." - DiscardedBytesTotal = "discarded_bytes_total" ) type ErrStreamRateLimit struct { @@ -129,13 +128,3 @@ var LineLengthHist = promauto.NewHistogram(prometheus.HistogramOpts{ Help: "The total number of bytes per line.", Buckets: prometheus.ExponentialBuckets(1, 8, 8), // 1B -> 16MB }) - -// DuplicateLogBytes is a metric of the total discarded duplicate bytes, by tenant. -var DuplicateLogBytes = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: constants.Loki, - Name: "duplicate_log_bytes_total", - Help: "The total number of bytes that were discarded for duplicate log lines.", - }, - []string{ReasonLabel, "tenant"}, -) From 3b40971c8006c67f66bdcc8aca1a35a9515b20b1 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 30 May 2024 16:38:29 -0400 Subject: [PATCH 04/15] make fmt --- pkg/chunkenc/unordered.go | 3 ++- pkg/ingester/stream_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index b48b9e093c54d..0a24b9f449ce6 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -6,10 +6,11 @@ import ( "encoding/binary" "fmt" - "github.com/grafana/loki/v3/pkg/util/constants" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/loki/v3/pkg/util/constants" + "io" "math" "time" diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 4505f5029824b..98c6529d391f0 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -12,9 +12,10 @@ import ( gokitlog "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/v3/pkg/runtime" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/grafana/loki/v3/pkg/runtime" + "github.com/grafana/dskit/httpgrpc" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" From 471059f25fe56767cdc465e55e01f5e4cbb0f33b Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 30 May 2024 16:41:45 -0400 Subject: [PATCH 05/15] Remove commented out code from first pass --- pkg/ingester/stream.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index c396e4426fc1a..08e4b0b494737 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -387,13 +387,6 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, // NOTE: it's still possible for duplicates to be appended if a stream is // deleted from inactivity. if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content { - /*if s.configs.LogDuplicateMetrics(s.tenant) { - s.reportDuplicateMetrics(len(entries[i].Line)) - } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - err := chunkenc.ErrDuplicateLogEntry(entries[i].Timestamp, s.labelsString) - s.writeFailures.Log(s.tenant, err) - }*/ continue } From 2070530d7d717c1f03b45615d450327be1c610c9 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 31 May 2024 13:41:09 -0400 Subject: [PATCH 06/15] Pass metrics and such into MemChunk and UnorderedHeadBlock --- pkg/chunkenc/memchunk.go | 37 +++++++++++++++++-------- pkg/chunkenc/unordered.go | 54 +++++++++++++------------------------ pkg/ingester/metrics.go | 9 +++++++ pkg/ingester/stream.go | 20 ++++++++++++-- pkg/ingester/stream_test.go | 12 +++++---- 5 files changed, 78 insertions(+), 54 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 999e5e0554b7d..6c3b0eae9c01c 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -12,6 +12,8 @@ import ( "time" "unsafe" + "github.com/prometheus/client_golang/prometheus" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -22,7 +24,6 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/util/filter" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -139,6 +140,14 @@ type MemChunk struct { // compressed size of chunk. Set when chunk is cut or while decoding chunk from storage. compressedSize int + + // Runtime-config overrides for a tenant may allow these to be non-nil, in which case + // they will be passed to the unordered head block, subsequently used + // to output information related to duplicate log lines and bytes discarded + writeFailures *writefailures.Manager + duplicateLogBytes *prometheus.CounterVec + tenant string + labelsString string } type block struct { @@ -198,10 +207,6 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { return nil } -func (hb *headBlock) AppendForTenant(ts int64, line string, _ labels.Labels, _ string, _ *runtime.TenantConfigs, _ string, _ *writefailures.Manager) error { - return hb.Append(ts, line, nil) -} - func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { @@ -841,11 +846,6 @@ func (c *MemChunk) Utilization() float64 { // Append implements Chunk. func (c *MemChunk) Append(entry *logproto.Entry) error { - return c.AppendForTenant(entry, "", nil, "", nil) -} - -// Append implements Chunk. -func (c *MemChunk) AppendForTenant(entry *logproto.Entry, tenant string, configs *runtime.TenantConfigs, labelsString string, manager *writefailures.Manager) error { entryTimestamp := entry.Timestamp.UnixNano() // If the head block is empty but there are cut blocks, we have to make @@ -857,7 +857,7 @@ func (c *MemChunk) AppendForTenant(entry *logproto.Entry, tenant string, configs if c.format < ChunkFormatV4 { entry.StructuredMetadata = nil } - if err := c.head.AppendForTenant(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata), tenant, configs, labelsString, manager); err != nil { + if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil { return err } @@ -1149,6 +1149,21 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err return newChunk, nil } +func (c *MemChunk) SetDuplicateLogLinesCapture(writeFailures *writefailures.Manager, duplicateLogBytes *prometheus.CounterVec, tenant, labelsString string) { + c.writeFailures = writeFailures + c.duplicateLogBytes = duplicateLogBytes + c.tenant = tenant + c.labelsString = labelsString + + unorderedHead, ok := c.head.(*unorderedHeadBlock) + if ok { + unorderedHead.writeFailures = c.writeFailures + unorderedHead.duplicateLogBytes = c.duplicateLogBytes + unorderedHead.tenant = c.tenant + unorderedHead.labelsString = c.labelsString + } +} + // encBlock is an internal wrapper for a block, mainly to avoid binding an encoding in a block itself. // This may seem roundabout, but the encoding is already a field on the parent MemChunk type. encBlock // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 0a24b9f449ce6..8253eba898d1a 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -6,15 +6,12 @@ import ( "encoding/binary" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/grafana/loki/v3/pkg/util/constants" - "io" "math" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/Workiva/go-datastructures/rangetree" "github.com/cespare/xxhash/v2" "github.com/pkg/errors" @@ -25,7 +22,6 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - "github.com/grafana/loki/v3/pkg/runtime" ) var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) @@ -43,7 +39,6 @@ type HeadBlock interface { UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) Append(int64, string, labels.Labels) error - AppendForTenant(int64, string, labels.Labels, string, *runtime.TenantConfigs, string, *writefailures.Manager) error Iterator( ctx context.Context, direction logproto.Direction, @@ -71,17 +66,14 @@ type unorderedHeadBlock struct { lines int // number of entries size int // size of uncompressed bytes. mint, maxt int64 // upper and lower bounds -} -// DuplicateLogBytes is a metric of the total discarded duplicate bytes, by tenant. -var DuplicateLogBytes = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: constants.Loki, - Name: "duplicate_log_bytes_total", - Help: "The total number of bytes that were discarded for duplicate log lines.", - }, - []string{"reason", "tenant"}, -) + // Runtime-config overrides for a tenant may allow these to be non-nil, in which case + // they will be used to output information related to duplicate log lines and bytes discarded + writeFailures *writefailures.Manager + duplicateLogBytes *prometheus.CounterVec + tenant string + labelsString string +} func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt, symbolizer *symbolizer) *unorderedHeadBlock { return &unorderedHeadBlock{ @@ -130,10 +122,6 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { } func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) error { - return hb.AppendForTenant(ts, line, structuredMetadata, "", nil, "", nil) -} - -func (hb *unorderedHeadBlock) AppendForTenant(ts int64, line string, structuredMetadata labels.Labels, tenant string, configs *runtime.TenantConfigs, labelsString string, manager *writefailures.Manager) error { if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt { // structuredMetadata must be ignored for the previous head block formats structuredMetadata = nil @@ -158,16 +146,16 @@ func (hb *unorderedHeadBlock) AppendForTenant(ts int64, line string, structuredM for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - if configs != nil { - if configs.LogDuplicateMetrics(tenant) { - reportDuplicateMetrics(len(line), tenant) - } - if configs.LogDuplicateStreamInfo(tenant) { - errMsg := fmt.Sprintf("duplicate log entry at timestamp %d for stream %s", ts, labelsString) - err := errors.New(errMsg) - manager.Log(tenant, err) - } + + if hb.writeFailures != nil { + errMsg := fmt.Sprintf("duplicate log entry at timestamp %d for stream %s", ts, hb.labelsString) + err := errors.New(errMsg) + hb.writeFailures.Log(hb.tenant, err) + } + if hb.duplicateLogBytes != nil { + hb.duplicateLogBytes.WithLabelValues(hb.tenant).Add(float64(len(line))) } + return nil } } @@ -192,12 +180,6 @@ func (hb *unorderedHeadBlock) AppendForTenant(ts int64, line string, structuredM return nil } -var DiscardedBytesTotal = "discarded_bytes_total" - -func reportDuplicateMetrics(duplicateLogLineBytes int, tenant string) { - DuplicateLogBytes.WithLabelValues(DiscardedBytesTotal, tenant).Add(float64(duplicateLogLineBytes)) -} - func metaLabelsLen(metaLabels labels.Labels) int { length := 0 for _, label := range metaLabels { diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 756eba0ebea74..fd2a3e52bbb97 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -66,6 +66,8 @@ type ingesterMetrics struct { shutdownMarker prometheus.Gauge flushQueueLength prometheus.Gauge + + duplicateLogBytesTotal *prometheus.CounterVec } // setRecoveryBytesInUse bounds the bytes reports to >= 0. @@ -293,5 +295,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), + + duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "ingester", + Name: "duplicate_log_bytes_total", + Help: "The total number of bytes that were discarded for duplicate log lines.", + }, []string{"tenant"}), } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 08e4b0b494737..ccfedba75fefb 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/runtime" "github.com/go-kit/log/level" @@ -170,7 +172,21 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er } func (s *stream) NewChunk() *chunkenc.MemChunk { - return chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, s.chunkHeadBlockFormat, s.cfg.BlockSize, s.cfg.TargetChunkSize) + chunk := chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, s.chunkHeadBlockFormat, s.cfg.BlockSize, s.cfg.TargetChunkSize) + + if s.configs != nil { + var manager *writefailures.Manager + var metrics *prometheus.CounterVec + if s.configs.LogDuplicateMetrics(s.tenant) { + metrics = s.metrics.duplicateLogBytesTotal + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + manager = s.writeFailures + } + chunk.SetDuplicateLogLinesCapture(manager, metrics, s.tenant, s.labelsString) + } + + return chunk } func (s *stream) Push( @@ -340,7 +356,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } chunk.lastUpdated = time.Now() - if err := chunk.chunk.AppendForTenant(&entries[i], s.tenant, s.configs, s.labelsString, s.writeFailures); err != nil { + if err := chunk.chunk.Append(&entries[i]); err != nil { invalid = append(invalid, entryWithError{&entries[i], err}) if chunkenc.IsOutOfOrderErr(err) { s.writeFailures.Log(s.tenant, err) diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 98c6529d391f0..2c7498a7320de 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -9,11 +9,11 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" + gokitlog "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/dskit/httpgrpc" @@ -171,9 +171,11 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { runtimeCfg, err := runtime.NewTenantConfigs(provider) - manager := writefailures.NewManager(logger, prometheus.NewRegistry(), writefailures.Cfg{LogRate: flagext.ByteSize(1000), AddInsightsLabel: true}, runtimeCfg, "ingester") + registry := prometheus.NewRegistry() + manager := writefailures.NewManager(logger, registry, writefailures.Cfg{LogRate: flagext.ByteSize(1000), AddInsightsLabel: true}, runtimeCfg, "ingester") require.NoError(t, err) + metrics := newIngesterMetrics(registry, "loki") s := newStream( chunkfmt, @@ -187,7 +189,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { }, true, NewStreamRateCalculator(), - NilMetrics, + metrics, manager, runtimeCfg, ) @@ -206,7 +208,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, 2, s.chunks[0].chunk.Size(), "expected exact duplicate to be dropped and newer content with same timestamp to be appended") - require.Equal(t, float64(4), testutil.ToFloat64(chunkenc.DuplicateLogBytes.WithLabelValues(chunkenc.DiscardedBytesTotal, "fake"))) + require.Equal(t, float64(4), testutil.ToFloat64(metrics.duplicateLogBytesTotal.WithLabelValues("fake"))) content := buf.String() require.NotEmpty(t, content) From 074f1f9c48cd96ab4d1418011911037d0b822483 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 7 Jun 2024 13:45:19 -0400 Subject: [PATCH 07/15] Move back to a variant of version 1, using a bool instead of an err --- pkg/chunkenc/dumb_chunk.go | 8 +- pkg/chunkenc/interface.go | 2 +- pkg/chunkenc/memchunk.go | 49 ++----- pkg/chunkenc/memchunk_test.go | 126 ++++++++++++------ pkg/chunkenc/unordered.go | 33 +---- pkg/chunkenc/unordered_test.go | 63 ++++++--- pkg/chunkenc/util_test.go | 6 +- pkg/compactor/retention/retention_test.go | 6 +- pkg/ingester/chunk_test.go | 3 +- pkg/ingester/encoding_test.go | 10 +- pkg/ingester/instance_test.go | 3 +- pkg/ingester/stream.go | 33 +++-- pkg/ingester/stream_test.go | 3 +- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 4 +- pkg/storage/chunk/cache/cache_test.go | 2 +- .../chunk/client/testutils/testutils.go | 2 +- pkg/storage/chunk/fetcher/fetcher_test.go | 2 +- pkg/storage/hack/main.go | 2 +- pkg/storage/store_test.go | 4 +- .../stores/series/series_store_test.go | 3 +- .../indexshipper/boltdb/compactor/util.go | 6 +- pkg/storage/util_test.go | 2 +- 22 files changed, 208 insertions(+), 164 deletions(-) diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index e2d520df6e024..6f233a2877244 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -36,17 +36,17 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { return len(c.entries) < tmpNumEntries } -func (c *dumbChunk) Append(entry *logproto.Entry) error { +func (c *dumbChunk) Append(entry *logproto.Entry) (bool, error) { if len(c.entries) == tmpNumEntries { - return ErrChunkFull + return false, ErrChunkFull } if len(c.entries) > 0 && c.entries[len(c.entries)-1].Timestamp.After(entry.Timestamp) { - return ErrOutOfOrder + return false, ErrOutOfOrder } c.entries = append(c.entries, *entry) - return nil + return false, nil } func (c *dumbChunk) Size() int { diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 3825a6520af5f..cf9e0a498f559 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -132,7 +132,7 @@ func SupportedEncoding() string { type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool - Append(*logproto.Entry) error + Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 6c3b0eae9c01c..c9f19984e8387 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -12,14 +12,11 @@ import ( "time" "unsafe" - "github.com/prometheus/client_golang/prometheus" - "github.com/cespare/xxhash/v2" "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -140,14 +137,6 @@ type MemChunk struct { // compressed size of chunk. Set when chunk is cut or while decoding chunk from storage. compressedSize int - - // Runtime-config overrides for a tenant may allow these to be non-nil, in which case - // they will be passed to the unordered head block, subsequently used - // to output information related to duplicate log lines and bytes discarded - writeFailures *writefailures.Manager - duplicateLogBytes *prometheus.CounterVec - tenant string - labelsString string } type block struct { @@ -192,9 +181,9 @@ func (hb *headBlock) Reset() { func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt } -func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { +func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error) { if !hb.IsEmpty() && hb.maxt > ts { - return ErrOutOfOrder + return false, ErrOutOfOrder } hb.entries = append(hb.entries, entry{t: ts, s: line}) @@ -204,7 +193,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) error { hb.maxt = ts hb.size += len(line) - return nil + return false, nil } func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { @@ -351,7 +340,7 @@ func (hb *headBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (Head out := version.NewBlock(symbolizer) for _, e := range hb.entries { - if err := out.Append(e.t, e.s, e.structuredMetadata); err != nil { + if _, err := out.Append(e.t, e.s, e.structuredMetadata); err != nil { return nil, err } } @@ -845,27 +834,28 @@ func (c *MemChunk) Utilization() float64 { } // Append implements Chunk. -func (c *MemChunk) Append(entry *logproto.Entry) error { +func (c *MemChunk) Append(entry *logproto.Entry) (bool, error) { entryTimestamp := entry.Timestamp.UnixNano() // If the head block is empty but there are cut blocks, we have to make // sure the new entry is not out of order compared to the previous block if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { - return ErrOutOfOrder + return false, ErrOutOfOrder } if c.format < ChunkFormatV4 { entry.StructuredMetadata = nil } - if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)); err != nil { - return err + dup, err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)) + if err != nil { + return dup, err } if c.head.UncompressedSize() >= c.blockSize { - return c.cut() + return false, c.cut() } - return nil + return dup, nil } // Close implements Chunk. @@ -1133,7 +1123,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err if filter != nil && filter(entry.Timestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)...) { continue } - if err := newChunk.Append(&entry); err != nil { + if _, err := newChunk.Append(&entry); err != nil { return nil, err } } @@ -1149,21 +1139,6 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err return newChunk, nil } -func (c *MemChunk) SetDuplicateLogLinesCapture(writeFailures *writefailures.Manager, duplicateLogBytes *prometheus.CounterVec, tenant, labelsString string) { - c.writeFailures = writeFailures - c.duplicateLogBytes = duplicateLogBytes - c.tenant = tenant - c.labelsString = labelsString - - unorderedHead, ok := c.head.(*unorderedHeadBlock) - if ok { - unorderedHead.writeFailures = c.writeFailures - unorderedHead.duplicateLogBytes = c.duplicateLogBytes - unorderedHead.tenant = c.tenant - unorderedHead.labelsString = c.labelsString - } -} - // encBlock is an internal wrapper for a block, mainly to avoid binding an encoding in a block itself. // This may seem roundabout, but the encoding is already a field on the parent MemChunk type. encBlock // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 09eab22f74be4..d8782cc3e3b9b 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -88,7 +88,8 @@ func TestBlocksInclusive(t *testing.T) { for _, format := range allPossibleFormats { chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt chk := NewMemChunk(chunkfmt, enc, headfmt, testBlockSize, testTargetSize) - err := chk.Append(logprotoEntry(1, "1")) + dup, err := chk.Append(logprotoEntry(1, "1")) + require.False(t, dup) require.Nil(t, err) err = chk.cut() require.Nil(t, err) @@ -178,7 +179,9 @@ func TestBlock(t *testing.T) { } for _, c := range cases { - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(c.ts, c.str, c.lbs))) + dup, err := chk.Append(logprotoEntryWithStructuredMetadata(c.ts, c.str, c.lbs)) + require.False(t, dup) + require.NoError(t, err) if c.cut { require.NoError(t, chk.cut()) } @@ -442,7 +445,9 @@ func TestSerialization(t *testing.T) { if appendWithStructuredMetadata { entry.StructuredMetadata = []logproto.LabelAdapter{{Name: "foo", Value: strconv.Itoa(i)}} } - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chk.Close()) @@ -527,7 +532,9 @@ func TestChunkFilling(t *testing.T) { i := int64(0) for ; chk.SpaceFor(entry) && i < 30; i++ { entry.Timestamp = time.Unix(0, i) - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } require.Equal(t, int64(lines), i) @@ -576,7 +583,9 @@ func TestGZIPChunkTargetSize(t *testing.T) { Line: string(logLine), } entry.Timestamp = time.Unix(0, i) - require.NoError(t, chk.Append(entry)) + dup, err := chk.Append(entry) + require.False(t, dup) + require.NoError(t, err) } // 5000 is a limit ot make sure the test doesn't run away, we shouldn't need this many log lines to make 1MB chunk @@ -606,37 +615,58 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { tests := map[string]tester{ "append out of order in the same block": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, "append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, "append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) { - assert.NoError(t, chk.Append(logprotoEntry(5, "test"))) + dup, err := chk.Append(logprotoEntry(5, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) - assert.NoError(t, chk.Append(logprotoEntry(6, "test"))) + dup, err = chk.Append(logprotoEntry(6, "test")) + assert.False(t, dup) + assert.NoError(t, err) assert.NoError(t, chk.cut()) if chk.headFmt == OrderedHeadBlockFmt { - assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error()) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { - assert.NoError(t, chk.Append(logprotoEntry(1, "test"))) + dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) + assert.NoError(t, err) } }, } @@ -705,7 +735,7 @@ func TestChunkStats(t *testing.T) { if !c.SpaceFor(entry) { break } - if err := c.Append(entry); err != nil { + if _, err := c.Append(entry); err != nil { t.Fatal(err) } inserted++ @@ -826,7 +856,7 @@ func BenchmarkWrite(b *testing.B) { c := NewMemChunk(ChunkFormatV3, enc, f, testBlockSize, testTargetSize) // adds until full so we trigger cut which serialize using gzip for c.SpaceFor(entry) { - _ = c.Append(entry) + _, _ = c.Append(entry) entry.Timestamp = time.Unix(0, i) entry.Line = testdata.LogString(i) if withStructuredMetadata { @@ -977,7 +1007,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { } for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { + if _, err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { b.Fatal(err) } } @@ -1009,7 +1039,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { } for i := 0; i < j; i++ { - if err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { + if _, err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil { b.Fatal(err) } } @@ -1034,13 +1064,13 @@ func TestMemChunk_IteratorBounds(t *testing.T) { t.Helper() c := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) - if err := c.Append(&logproto.Entry{ + if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), Line: "1", }); err != nil { t.Fatal(err) } - if err := c.Append(&logproto.Entry{ + if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 2), Line: "2", }); err != nil { @@ -1099,7 +1129,9 @@ func TestMemchunkLongLine(t *testing.T) { c := NewMemChunk(ChunkFormatV3, enc, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) for i := 1; i <= 10; i++ { - require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) + dup, err := c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)}) + require.False(t, dup) + require.NoError(t, err) } noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) @@ -1143,7 +1175,9 @@ func TestCheckpointEncoding(t *testing.T) { }}, } require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.Nil(t, err) } // cut it @@ -1178,7 +1212,9 @@ func TestCheckpointEncoding(t *testing.T) { Line: fmt.Sprintf("hi there - %d", i), } require.Equal(t, true, c.SpaceFor(entry)) - require.Nil(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.Nil(t, err) } // ensure new blocks are not cut @@ -1321,7 +1357,9 @@ func Test_HeadIteratorReverse(t *testing.T) { } var i int64 for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 { - require.NoError(t, c.Append(e)) + dup, err := c.Append(e) + require.False(t, dup) + require.NoError(t, err) } assertOrder := func(t *testing.T, total int64) { @@ -1427,7 +1465,7 @@ func TestMemChunk_Rebound(t *testing.T) { func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { chk := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: from.String(), Timestamp: from, }) @@ -1558,7 +1596,7 @@ func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matching if matchingFrom != nil && matchingTo != nil && (from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) { t.Logf("%v matching line", from.String()) - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: fmt.Sprintf("matching %v", from.String()), Timestamp: from, StructuredMetadata: structuredMetadata, @@ -1570,7 +1608,7 @@ func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matching if withStructuredMetadata { structuredMetadata = push.LabelsAdapter{{Name: "ding", Value: "dong"}} } - err := chk.Append(&logproto.Entry{ + _, err := chk.Append(&logproto.Entry{ Line: from.String(), Timestamp: from, StructuredMetadata: structuredMetadata, @@ -1700,7 +1738,9 @@ func TestMemChunk_SpaceFor(t *testing.T) { chk.blocks = make([]block, tc.nBlocks) chk.cutBlockSize = tc.cutBlockSize for i := 0; i < tc.headSize; i++ { - require.NoError(t, chk.head.Append(int64(i), "a", nil)) + dup, err := chk.head.Append(int64(i), "a", nil) + require.False(t, dup) + require.NoError(t, err) } expect := tc.expect @@ -1724,23 +1764,31 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) { {Name: "job", Value: "fake"}, } chk := newMemChunkWithFormat(ChunkFormatV4, enc, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(1, "lineA", []logproto.LabelAdapter{ + dup, err := chk.Append(logprotoEntryWithStructuredMetadata(1, "lineA", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "a"}, - }))) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(2, "lineB", []logproto.LabelAdapter{ + })) + require.False(t, dup) + require.NoError(t, err) + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(2, "lineB", []logproto.LabelAdapter{ {Name: "traceID", Value: "456"}, {Name: "user", Value: "b"}, - }))) + })) + require.False(t, dup) + require.NoError(t, err) require.NoError(t, chk.cut()) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(3, "lineC", []logproto.LabelAdapter{ + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(3, "lineC", []logproto.LabelAdapter{ {Name: "traceID", Value: "789"}, {Name: "user", Value: "c"}, - }))) - require.NoError(t, chk.Append(logprotoEntryWithStructuredMetadata(4, "lineD", []logproto.LabelAdapter{ + })) + require.False(t, dup) + require.NoError(t, err) + dup, err = chk.Append(logprotoEntryWithStructuredMetadata(4, "lineD", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "d"}, - }))) + })) + require.False(t, dup) + require.NoError(t, err) // The expected bytes is the sum of bytes decompressed and bytes read from the head chunk. // First we add the bytes read from the store (aka decompressed). That's diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 8253eba898d1a..64fadfedd134c 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -10,14 +10,11 @@ import ( "math" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/Workiva/go-datastructures/rangetree" "github.com/cespare/xxhash/v2" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -38,7 +35,7 @@ type HeadBlock interface { Entries() int UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) - Append(int64, string, labels.Labels) error + Append(int64, string, labels.Labels) (bool, error) Iterator( ctx context.Context, direction logproto.Direction, @@ -66,13 +63,6 @@ type unorderedHeadBlock struct { lines int // number of entries size int // size of uncompressed bytes. mint, maxt int64 // upper and lower bounds - - // Runtime-config overrides for a tenant may allow these to be non-nil, in which case - // they will be used to output information related to duplicate log lines and bytes discarded - writeFailures *writefailures.Manager - duplicateLogBytes *prometheus.CounterVec - tenant string - labelsString string } func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt, symbolizer *symbolizer) *unorderedHeadBlock { @@ -121,7 +111,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } -func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) error { +func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) { if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt { // structuredMetadata must be ignored for the previous head block formats structuredMetadata = nil @@ -146,17 +136,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l for _, et := range displaced[0].(*nsEntries).entries { if et.line == line { e.entries = displaced[0].(*nsEntries).entries - - if hb.writeFailures != nil { - errMsg := fmt.Sprintf("duplicate log entry at timestamp %d for stream %s", ts, hb.labelsString) - err := errors.New(errMsg) - hb.writeFailures.Log(hb.tenant, err) - } - if hb.duplicateLogBytes != nil { - hb.duplicateLogBytes.WithLabelValues(hb.tenant).Add(float64(len(line))) - } - - return nil + return true, nil } } e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)}) @@ -177,7 +157,7 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata l hb.size += len(structuredMetadata) * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols hb.lines++ - return nil + return false, nil } func metaLabelsLen(metaLabels labels.Labels) int { @@ -464,7 +444,8 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symboliz 0, math.MaxInt64, func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error { - return out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)) + _, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)) + return err }, ) return out, err @@ -604,7 +585,7 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { } } - if err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil { + if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil { return err } } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index f4930952660fc..43c07d0f835f3 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -35,7 +35,9 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { func Test_forEntriesEarlyReturn(t *testing.T) { hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, newSymbolizer()) for i := 0; i < 10; i++ { - require.Nil(t, hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}})) + dup, err := hb.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "i", Value: fmt.Sprint(i)}}) + require.False(t, dup) + require.Nil(t, err) } // forward @@ -86,6 +88,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { desc string input, exp []entry dir logproto.Direction + hasDup bool }{ { desc: "simple forward", @@ -152,7 +155,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {0, "a", nil}, {0, "b", nil}, {1, "c", nil}, }, - dir: logproto.FORWARD, + dir: logproto.FORWARD, + hasDup: true, }, { desc: "ts remove exact dupe backward", @@ -162,7 +166,8 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { exp: []entry{ {1, "c", nil}, {0, "b", nil}, {0, "a", nil}, }, - dir: logproto.BACKWARD, + dir: logproto.BACKWARD, + hasDup: true, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -172,9 +177,17 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { } { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) + dup := false for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) + tmpdup, err := hb.Append(e.t, e.s, e.structuredMetadata) + if !dup { // only set dup if it's not already true + if tmpdup { // can't examine duplicates until we start getting all the data + dup = true + } + } + require.Nil(t, err) } + require.Equal(t, tc.hasDup, dup) itr := hb.Iterator( context.Background(), @@ -250,7 +263,9 @@ func Test_UnorderedBoundedIter(t *testing.T) { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format, newSymbolizer()) for _, e := range tc.input { - require.Nil(t, hb.Append(e.t, e.s, e.structuredMetadata)) + dup, err := hb.Append(e.t, e.s, e.structuredMetadata) + require.False(t, dup) + require.Nil(t, err) } itr := hb.Iterator( @@ -281,9 +296,15 @@ func TestHeadBlockInterop(t *testing.T) { unorderedWithStructuredMetadata := newUnorderedHeadBlock(UnorderedWithStructuredMetadataHeadBlockFmt, newSymbolizer()) for i := 0; i < 100; i++ { metaLabels := labels.Labels{{Name: "foo", Value: fmt.Sprint(99 - i)}} - require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) - require.Nil(t, unorderedWithStructuredMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) - require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}})) + dup, err := unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels) + require.False(t, dup) + require.Nil(t, err) + dup, err = unorderedWithStructuredMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels) + require.False(t, dup) + require.Nil(t, err) + dup, err = ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}}) + require.False(t, dup) + require.Nil(t, err) } // turn to bytes @@ -359,14 +380,14 @@ func BenchmarkHeadBlockWrites(b *testing.B) { headBlockFn := func() func(int64, string, labels.Labels) { hb := &headBlock{} return func(ts int64, line string, metaLabels labels.Labels) { - _ = hb.Append(ts, line, metaLabels) + _, _ = hb.Append(ts, line, metaLabels) } } unorderedHeadBlockFn := func() func(int64, string, labels.Labels) { hb := newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil) return func(ts int64, line string, metaLabels labels.Labels) { - _ = hb.Append(ts, line, metaLabels) + _, _ = hb.Append(ts, line, metaLabels) } } @@ -432,10 +453,12 @@ func TestUnorderedChunkIterators(t *testing.T) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for i := 0; i < 100; i++ { // push in reverse order - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(int64(99-i), 0), Line: fmt.Sprint(99 - i), - })) + }) + require.False(t, dup) + require.Nil(t, err) // ensure we have a mix of cut blocks + head block. if i%30 == 0 { @@ -574,7 +597,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) { func chunkFrom(xs []logproto.Entry) ([]byte, error) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range xs { - if err := c.Append(&x); err != nil { + if _, err := c.Append(&x); err != nil { return nil, err } } @@ -634,7 +657,9 @@ func TestReorder(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range tc.input { - require.Nil(t, c.Append(&x)) + dup, err := c.Append(&x) + require.False(t, dup) + require.Nil(t, err) } require.Nil(t, c.Close()) b, err := c.Bytes() @@ -657,10 +682,12 @@ func TestReorderAcrossBlocks(t *testing.T) { {3, 7}, } { for _, x := range batch { - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(int64(x), 0), Line: fmt.Sprint(x), - })) + }) + require.False(t, dup) + require.Nil(t, err) } require.Nil(t, c.cut()) } @@ -705,7 +732,9 @@ func Test_HeadIteratorHash(t *testing.T) { "ordered": &headBlock{}, } { t.Run(name, func(t *testing.T) { - require.NoError(t, b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}})) + dup, err := b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}}) + require.False(t, dup) + require.NoError(t, err) eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs)) for eit.Next() { diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index a1860f9ae297a..de74f7946e2ad 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -33,7 +33,7 @@ func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk c := NewMemChunk(ChunkFormatV4, enc, UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for c.SpaceFor(entry) { size += uint64(len(entry.Line)) - _ = c.Append(entry) + _, _ = c.Append(entry) i++ entry = logprotoEntry(i, testdata.LogString(i)) } @@ -55,7 +55,7 @@ func fillChunkClose(c Chunk, close bool) int64 { Line: testdata.LogString(i), } for c.SpaceFor(entry) { - err := c.Append(entry) + _, err := c.Append(entry) if err != nil { panic(err) } @@ -81,7 +81,7 @@ func fillChunkRandomOrder(c Chunk, close bool) { } for c.SpaceFor(entry) { - err := c.Append(entry) + _, err := c.Append(entry) if err != nil { panic(err) } diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 6c261d34799e5..a3f157dc77743 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -223,11 +223,13 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { - require.NoError(t, chunkEnc.Append(&logproto.Entry{ + dup, err := chunkEnc.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: ts.String(), StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", ts.String())), - })) + }) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 4523bc8cc1d8b..9ceb3c740926e 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -55,10 +55,11 @@ func TestIterator(t *testing.T) { t.Run(chk.name, func(t *testing.T) { chunk := chk.new() for i := int64(0); i < entries; i++ { - err := chunk.Append(&logproto.Entry{ + dup, err := chunk.Append(&logproto.Entry{ Timestamp: time.Unix(i, 0), Line: fmt.Sprintf("line %d", i), }) + require.False(t, dup) require.NoError(t, err) } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 4bb1aab0b8da6..458da1132c963 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -22,7 +22,9 @@ func fillChunk(t testing.TB, c chunkenc.Chunk) { } for c.SpaceFor(entry) { - require.NoError(t, c.Append(entry)) + dup, err := c.Append(entry) + require.False(t, dup) + require.NoError(t, err) i++ entry.Timestamp = time.Unix(0, i) entry.Line = fmt.Sprintf("entry for line %d", i) @@ -120,10 +122,12 @@ func Test_EncodingChunks(t *testing.T) { func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) - require.Nil(t, c.Append(&logproto.Entry{ + dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", - })) + }) + require.False(t, dup) + require.Nil(t, err) data, err := c.Bytes() require.Nil(t, err) from, to := c.Bounds() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ed7dcc2cdadca..f82f1d6962aaa 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -310,7 +310,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { require.NoError(t, err) chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() for _, entry := range testStream.Entries { - err = chunk.Append(&entry) + dup, err := chunk.Append(&entry) + require.False(t, dup) require.NoError(t, err) } stream.chunks = append(stream.chunks, chunkDesc{chunk: chunk}) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index ccfedba75fefb..24b032b43974e 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,8 +8,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/v3/pkg/runtime" "github.com/go-kit/log/level" @@ -172,21 +170,7 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er } func (s *stream) NewChunk() *chunkenc.MemChunk { - chunk := chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, s.chunkHeadBlockFormat, s.cfg.BlockSize, s.cfg.TargetChunkSize) - - if s.configs != nil { - var manager *writefailures.Manager - var metrics *prometheus.CounterVec - if s.configs.LogDuplicateMetrics(s.tenant) { - metrics = s.metrics.duplicateLogBytesTotal - } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - manager = s.writeFailures - } - chunk.SetDuplicateLogLinesCapture(manager, metrics, s.tenant, s.labelsString) - } - - return chunk + return chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, s.chunkHeadBlockFormat, s.cfg.BlockSize, s.cfg.TargetChunkSize) } func (s *stream) Push( @@ -356,7 +340,8 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } chunk.lastUpdated = time.Now() - if err := chunk.chunk.Append(&entries[i]); err != nil { + dup, err := chunk.chunk.Append(&entries[i]) + if err != nil { invalid = append(invalid, entryWithError{&entries[i], err}) if chunkenc.IsOutOfOrderErr(err) { s.writeFailures.Log(s.tenant, err) @@ -365,6 +350,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } continue } + if dup { + if s.configs != nil { + if s.configs.LogDuplicateMetrics(s.tenant) { + s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entries[i].Line))) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entries[i].Timestamp.Format(time.RFC3339), s.labelsString) + dupErr := errors.New(errMsg) + s.writeFailures.Log(s.tenant, dupErr) + } + } + } s.entryCt++ s.lastLine.ts = entries[i].Timestamp diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 2c7498a7320de..68974ae016b39 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -285,10 +285,11 @@ func TestStreamIterator(t *testing.T) { chunk := chk.new() for j := int64(0); j < entries; j++ { k := i*entries + j - err := chunk.Append(&logproto.Entry{ + dup, err := chunk.Append(&logproto.Entry{ Timestamp: time.Unix(k, 0), Line: fmt.Sprintf("line %d", k), }) + require.False(t, dup) require.NoError(t, err) } s.chunks = append(s.chunks, chunkDesc{chunk: chunk}) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index dec23f91e80bb..1feff32ee6e4f 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -101,7 +101,7 @@ func TestTokenizerPopulate(t *testing.T) { lbsList = append(lbsList, labels.FromStrings("foo", "bar")) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) @@ -145,7 +145,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { lbsList = append(lbsList, labels.FromStrings("foo", "bar")) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 23550dd34965e..c6ab61666b88d 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -36,7 +36,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) - err := cs.Append(&logproto.Entry{ + _, err := cs.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts), }) diff --git a/pkg/storage/chunk/client/testutils/testutils.go b/pkg/storage/chunk/client/testutils/testutils.go index 2b35b612badca..b34e75a6a166f 100644 --- a/pkg/storage/chunk/client/testutils/testutils.go +++ b/pkg/storage/chunk/client/testutils/testutils.go @@ -89,7 +89,7 @@ func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk { cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) for ts := from; ts <= through; ts = ts.Add(15 * time.Second) { - err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) + _, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) if err != nil { panic(err) } diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 902b0dae1d743..03efc9afdc809 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -314,7 +314,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk { memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) // To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data for i := 0; i < from; i++ { - _ = memChk.Append(&logproto.Entry{ + _, _ = memChk.Append(&logproto.Entry{ Timestamp: time.Unix(int64(i), 0), Line: fmt.Sprintf("line ts=%d", i), }) diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index f85e44a41ac5f..74257a8ba6ad0 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -110,7 +110,7 @@ func fillStore(cm storage.ClientMetrics) error { Line: randString(250), } if chunkEnc.SpaceFor(entry) { - _ = chunkEnc.Append(entry) + _, _ = chunkEnc.Append(entry) } else { from, to := chunkEnc.Bounds() c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc, 0, 0), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano())) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 51f04538cc189..13bcaa9688a91 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2049,7 +2049,9 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { }, } } - require.NoError(t, chunkEnc.Append(&entry)) + dup, err := chunkEnc.Append(&entry) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go index 15ecb1623eebb..553ea945f94f7 100644 --- a/pkg/storage/stores/series/series_store_test.go +++ b/pkg/storage/stores/series/series_store_test.go @@ -755,7 +755,8 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo chk := chunkenc.NewMemChunk(format, chunkenc.EncGZIP, headfmt, 256*1024, 0) for i := 0; i < samples; i++ { ts := time.Duration(i) * 15 * time.Second - err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) + dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) + require.False(t, dup) require.NoError(t, err) } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go index 25ccb52e9b18e..6f1b0326a5cc6 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go @@ -34,10 +34,12 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock chunkEnc := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncSnappy, headBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { - require.NoError(t, chunkEnc.Append(&logproto.Entry{ + dup, err := chunkEnc.Append(&logproto.Entry{ Timestamp: ts.Time(), Line: ts.String(), - })) + }) + require.False(t, dup) + require.NoError(t, err) } require.NoError(t, chunkEnc.Close()) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 7c325cc4da6bb..5ef02e74b1caf 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -110,7 +110,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) chk := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncGZIP, headBlockFmt, 256*1024, 0) for _, e := range stream.Entries { - _ = chk.Append(&e) + _, _ = chk.Append(&e) } chk.Close() c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through) From 5c5e6317934bc0f28705109cbf5425b688d44ae1 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 7 Jun 2024 13:49:04 -0400 Subject: [PATCH 08/15] Add a comment about what the new return val is --- pkg/chunkenc/interface.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index cf9e0a498f559..bf68da1f56012 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -131,7 +131,8 @@ func SupportedEncoding() string { // Chunk is the interface for the compressed logs chunk format. type Chunk interface { Bounds() (time.Time, time.Time) - SpaceFor(*logproto.Entry) bool + SpaceFor(*logproto.Entry) + // Append returns true if the entry appended was a duplicate Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator From c80a90fb139051ae748271f19b2d8fbb56d0b32a Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 7 Jun 2024 14:20:29 -0400 Subject: [PATCH 09/15] Fix removal of return val --- pkg/chunkenc/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index bf68da1f56012..f0b17c7750f3d 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -131,7 +131,7 @@ func SupportedEncoding() string { // Chunk is the interface for the compressed logs chunk format. type Chunk interface { Bounds() (time.Time, time.Time) - SpaceFor(*logproto.Entry) + SpaceFor(*logproto.Entry) bool // Append returns true if the entry appended was a duplicate Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) From 6f321d40b30fd154517276c069820088682d8d63 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Fri, 7 Jun 2024 15:08:20 -0400 Subject: [PATCH 10/15] Lint --- pkg/canary/comparator/comparator_test.go | 2 +- pkg/chunkenc/memchunk_test.go | 3 +++ pkg/storage/bloom/v1/bloom_tokenizer_test.go | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index db28545397e3c..2b441b709a2a6 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -386,7 +386,7 @@ func TestCacheTest(t *testing.T) { queryResultsDiff = &mockCounter{} // reset counter mr.countOverTime = 2.3 // value not important - mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but withing tolerance + mr.noCacheCountOvertime = 2.30000005 // different than `countOverTime` value but within tolerance c.cacheTest(now) assert.Equal(t, 0, queryResultsDiff.(*mockCounter).count) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index d8782cc3e3b9b..f587d574f2b7e 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -625,6 +625,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { if chk.headFmt == OrderedHeadBlockFmt { dup, err = chk.Append(logprotoEntry(1, "test")) assert.EqualError(t, err, ErrOutOfOrder.Error()) + assert.False(t, dup) } else { dup, err = chk.Append(logprotoEntry(1, "test")) assert.False(t, dup) @@ -642,6 +643,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { if chk.headFmt == OrderedHeadBlockFmt { dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { dup, err = chk.Append(logprotoEntry(1, "test")) @@ -662,6 +664,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { if chk.headFmt == OrderedHeadBlockFmt { dup, err = chk.Append(logprotoEntry(1, "test")) + assert.False(t, dup) assert.EqualError(t, err, ErrOutOfOrder.Error()) } else { dup, err = chk.Append(logprotoEntry(1, "test")) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index b3af748d4b2dc..9bef1ab2ca202 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -140,7 +140,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics) memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) - _ = memChunk.Append(&push.Entry{ + _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, }) @@ -174,7 +174,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { func chunkRefItrFromLines(lines ...string) (iter.EntryIterator, error) { memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) for i, line := range lines { - if err := memChunk.Append(&push.Entry{ + if _, err := memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, int64(i)), Line: line, }); err != nil { From ae10548b9409397955d19319e303458a5d190042 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 20 Jun 2024 14:57:32 -0400 Subject: [PATCH 11/15] Rework writefailures logic to be easier to read --- pkg/distributor/writefailures/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/writefailures/manager.go b/pkg/distributor/writefailures/manager.go index 9c6e2818dca9d..5a02a7f2c2226 100644 --- a/pkg/distributor/writefailures/manager.go +++ b/pkg/distributor/writefailures/manager.go @@ -39,8 +39,8 @@ func (m *Manager) Log(tenantID string, err error) { return } - if !m.tenantCfgs.LimitedLogPushErrors(tenantID) && - !m.tenantCfgs.LogDuplicateStreamInfo(tenantID) { + if !(m.tenantCfgs.LimitedLogPushErrors(tenantID) || + m.tenantCfgs.LogDuplicateStreamInfo(tenantID)) { return } From 07a82f0c814e2d130a794a446bc50a7a14d8440b Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 20 Jun 2024 15:02:18 -0400 Subject: [PATCH 12/15] Break out logging of duplicate metrics/log into a function --- pkg/ingester/stream.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 24b032b43974e..1b2d266aa9a58 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -351,16 +351,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa continue } if dup { - if s.configs != nil { - if s.configs.LogDuplicateMetrics(s.tenant) { - s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entries[i].Line))) - } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entries[i].Timestamp.Format(time.RFC3339), s.labelsString) - dupErr := errors.New(errMsg) - s.writeFailures.Log(s.tenant, dupErr) - } - } + s.handleLoggingOfDuplicateEntry(entries[i]) } s.entryCt++ @@ -377,6 +368,19 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa return bytesAdded, storedEntries, invalid } +func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) { + if s.configs != nil { + if s.configs.LogDuplicateMetrics(s.tenant) { + s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entry.Timestamp.Format(time.RFC3339), s.labelsString) + dupErr := errors.New(errMsg) + s.writeFailures.Log(s.tenant, dupErr) + } + } +} + func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, isReplay, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) { var ( From 6b6d3d69e7856d02ad13715057c3a85a5b0a205a Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 20 Jun 2024 16:22:54 -0400 Subject: [PATCH 13/15] Don't make a nested if, break out early if no tenant configsZ --- pkg/ingester/stream.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 1b2d266aa9a58..2eff41e76238c 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -369,16 +369,18 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) { - if s.configs != nil { - if s.configs.LogDuplicateMetrics(s.tenant) { - s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) - } - if s.configs.LogDuplicateStreamInfo(s.tenant) { - errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entry.Timestamp.Format(time.RFC3339), s.labelsString) - dupErr := errors.New(errMsg) - s.writeFailures.Log(s.tenant, dupErr) - } + if s.configs == nil { + return } + if s.configs.LogDuplicateMetrics(s.tenant) { + s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) + } + if s.configs.LogDuplicateStreamInfo(s.tenant) { + errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entry.Timestamp.Format(time.RFC3339), s.labelsString) + dupErr := errors.New(errMsg) + s.writeFailures.Log(s.tenant, dupErr) + } + } func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, isReplay, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) { From 1f33c606c3dbb8fa4f086b6b2b19f889fafcc62d Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 20 Jun 2024 16:28:37 -0400 Subject: [PATCH 14/15] Add size to duplicate log line info --- pkg/ingester/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 2eff41e76238c..7d37859b1541f 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -376,7 +376,7 @@ func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) { s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) } if s.configs.LogDuplicateStreamInfo(s.tenant) { - errMsg := fmt.Sprintf("duplicate log entry at timestamp %s for stream %s", entry.Timestamp.Format(time.RFC3339), s.labelsString) + errMsg := fmt.Sprintf("duplicate log entry with size=%d at timestamp %s for stream %s", len(entry.Line), entry.Timestamp.Format(time.RFC3339), s.labelsString) dupErr := errors.New(errMsg) s.writeFailures.Log(s.tenant, dupErr) } From 982ebf6a4e472577c6185568e8a2bafb66f7db20 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Wed, 26 Jun 2024 08:17:59 -0400 Subject: [PATCH 15/15] PR review comments, add comments about what each implementation may return --- pkg/chunkenc/dumb_chunk.go | 1 + pkg/chunkenc/memchunk.go | 2 ++ pkg/chunkenc/unordered.go | 1 + 3 files changed, 4 insertions(+) diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 6f233a2877244..ef8548b1438da 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -36,6 +36,7 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { return len(c.entries) < tmpNumEntries } +// The dumbChunk does not check for duplicates, and will always return false func (c *dumbChunk) Append(entry *logproto.Entry) (bool, error) { if len(c.entries) == tmpNumEntries { return false, ErrChunkFull diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index c9f19984e8387..f4e27255633dd 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -181,6 +181,7 @@ func (hb *headBlock) Reset() { func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt } +// The headBlock does not check for duplicates, and will always return false func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error) { if !hb.IsEmpty() && hb.maxt > ts { return false, ErrOutOfOrder @@ -834,6 +835,7 @@ func (c *MemChunk) Utilization() float64 { } // Append implements Chunk. +// The MemChunk may return true or false, depending on what the head block returns. func (c *MemChunk) Append(entry *logproto.Entry) (bool, error) { entryTimestamp := entry.Timestamp.UnixNano() diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 64fadfedd134c..807f80b2c0f87 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -111,6 +111,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } +// unorderedHeadBlock will return true if the entry is a duplicate, false otherwise func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) { if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt { // structuredMetadata must be ignored for the previous head block formats