diff --git a/integration/client/client.go b/integration/client/client.go index 823fbdf73a310..7095e5bf398d1 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -90,8 +90,8 @@ func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) erro return c.pushLogLine(line, c.Now, nil, extraLabels...) } -func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error { - return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...) +func (c *Client) PushLogLineWithNonIndexedLabels(line string, logLabels map[string]string, extraLabels ...map[string]string) error { + return c.PushLogLineWithTimestampAndNonIndexedLabels(line, c.Now, logLabels, extraLabels...) } // PushLogLineWithTimestamp creates a new logline at the given timestamp @@ -100,7 +100,7 @@ func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extr return c.pushLogLine(line, timestamp, nil, extraLabels...) } -func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error { +func (c *Client) PushLogLineWithTimestampAndNonIndexedLabels(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error { // If the logLabels map is empty, labels.FromMap will allocate some empty slices. // Since this code is executed for every log line we receive, as an optimization // to avoid those allocations we'll call labels.FromMap only if the map is not empty. diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 9239289a9e65b..0f0a3d58af8e7 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -396,12 +396,12 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T) cliQueryFrontend.Now = now t.Run("ingest-logs", func(t *testing.T) { - require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"})) - require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"})) // ingest logs to the current period - require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"})) - require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"})) }) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index f3f6b8f075fc0..d2aa131b297bc 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -44,7 +44,7 @@ const ( defaultBlockSize = 256 * 1024 ) -var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithMetadataHeadBlockFmt} +var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithNonIndexedLabelsHeadBlockFmt} type HeadBlockFmt byte @@ -56,8 +56,8 @@ func (f HeadBlockFmt) String() string { return "ordered" case f == UnorderedHeadBlockFmt: return "unordered" - case f == UnorderedWithMetadataHeadBlockFmt: - return "unordered with metadata" + case f == UnorderedWithNonIndexedLabelsHeadBlockFmt: + return "unordered with non-indexed labels" default: return fmt.Sprintf("unknown: %v", byte(f)) } @@ -80,7 +80,7 @@ const ( _ OrderedHeadBlockFmt UnorderedHeadBlockFmt - UnorderedWithMetadataHeadBlockFmt + UnorderedWithNonIndexedLabelsHeadBlockFmt DefaultHeadBlockFmt = UnorderedHeadBlockFmt ) @@ -1144,8 +1144,8 @@ type bufferedIterator struct { currLine []byte // the current line, this is the same as the buffer but sliced the line size. currTs int64 - metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels. - currMetadataLabels labels.Labels // The current labels. + nonIndexedLabelsBuf [][]byte // The buffer for a single entry's non-indexed labels. + currNonIndexedLabels labels.Labels // The current labels. closed bool } @@ -1199,14 +1199,14 @@ func (si *bufferedIterator) Next() bool { si.currTs = ts si.currLine = line - si.currMetadataLabels = nonIndexedLabels + si.currNonIndexedLabels = nonIndexedLabels return true } // moveNext moves the buffer to the next entry func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { var decompressedBytes int64 - var decompressedMetadataBytes int64 + var decompressedNonIndexedLabelsBytes int64 var ts int64 var tWidth, lWidth, lineSize, lastAttempt int for lWidth == 0 { // Read until both varints have enough bytes. @@ -1307,33 +1307,33 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { } // Number of labels - decompressedMetadataBytes += binary.MaxVarintLen64 + decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64 // Shift down what is still left in the fixed-size read buffer, if any. si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid]) // If not enough space for the labels, create a new buffer slice and put the old one back in the pool. - metaLabelsBufLen := nLabels * 2 - if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) { - if si.metaLabelsBuf != nil { - for i := range si.metaLabelsBuf { - if si.metaLabelsBuf[i] != nil { - BytesBufferPool.Put(si.metaLabelsBuf[i]) + nonIndexedLabelsBufLen := nLabels * 2 + if si.nonIndexedLabelsBuf == nil || nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) { + if si.nonIndexedLabelsBuf != nil { + for i := range si.nonIndexedLabelsBuf { + if si.nonIndexedLabelsBuf[i] != nil { + BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) } } - LabelsPool.Put(si.metaLabelsBuf) + LabelsPool.Put(si.nonIndexedLabelsBuf) } - si.metaLabelsBuf = LabelsPool.Get(metaLabelsBufLen).([][]byte) - if metaLabelsBufLen > cap(si.metaLabelsBuf) { - si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", metaLabelsBufLen, cap(si.metaLabelsBuf)) + si.nonIndexedLabelsBuf = LabelsPool.Get(nonIndexedLabelsBufLen).([][]byte) + if nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) { + si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", nonIndexedLabelsBufLen, cap(si.nonIndexedLabelsBuf)) return 0, nil, nil, false } } - si.metaLabelsBuf = si.metaLabelsBuf[:nLabels*2] + si.nonIndexedLabelsBuf = si.nonIndexedLabelsBuf[:nLabels*2] // Read all the label-value pairs, into the buffer slice. - for i := 0; i < metaLabelsBufLen; i++ { + for i := 0; i < nonIndexedLabelsBufLen; i++ { // Read the length of the label. lastAttempt = 0 var labelWidth, labelSize int @@ -1360,30 +1360,30 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { } // Label size - decompressedMetadataBytes += binary.MaxVarintLen64 + decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64 // If the buffer is not yet initialize or too small, we get a new one. - if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) { + if si.nonIndexedLabelsBuf[i] == nil || labelSize > cap(si.nonIndexedLabelsBuf[i]) { // in case of a replacement we replace back the buffer in the pool - if si.metaLabelsBuf[i] != nil { - BytesBufferPool.Put(si.metaLabelsBuf[i]) + if si.nonIndexedLabelsBuf[i] != nil { + BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) } - si.metaLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte) - if labelSize > cap(si.metaLabelsBuf[i]) { - si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.metaLabelsBuf[i])) + si.nonIndexedLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte) + if labelSize > cap(si.nonIndexedLabelsBuf[i]) { + si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.nonIndexedLabelsBuf[i])) return 0, nil, nil, false } } - si.metaLabelsBuf[i] = si.metaLabelsBuf[i][:labelSize] + si.nonIndexedLabelsBuf[i] = si.nonIndexedLabelsBuf[i][:labelSize] // Take however many bytes are left in the read buffer. - n := copy(si.metaLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid]) + n := copy(si.nonIndexedLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid]) // Shift down what is still left in the fixed-size read buffer, if any. si.readBufValid = copy(si.readBuf[:], si.readBuf[labelWidth+n:si.readBufValid]) // Then process reading the label. for n < labelSize { - r, err := si.reader.Read(si.metaLabelsBuf[i][n:labelSize]) + r, err := si.reader.Read(si.nonIndexedLabelsBuf[i][n:labelSize]) n += r if err != nil { // We might get EOF after reading enough bytes to fill the buffer, which is OK. @@ -1396,14 +1396,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) { } } - decompressedMetadataBytes += int64(labelSize) + decompressedNonIndexedLabelsBytes += int64(labelSize) } si.stats.AddDecompressedLines(1) - si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedMetadataBytes) - si.stats.AddDecompressedBytes(decompressedBytes + decompressedMetadataBytes) + si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedNonIndexedLabelsBytes) + si.stats.AddDecompressedBytes(decompressedBytes + decompressedNonIndexedLabelsBytes) - return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true + return ts, si.buf[:lineSize], si.nonIndexedLabelsBuf[:nonIndexedLabelsBufLen], true } func (si *bufferedIterator) Error() error { return si.err } @@ -1427,15 +1427,15 @@ func (si *bufferedIterator) close() { si.buf = nil } - if si.metaLabelsBuf != nil { - for i := range si.metaLabelsBuf { - if si.metaLabelsBuf[i] != nil { - BytesBufferPool.Put(si.metaLabelsBuf[i]) - si.metaLabelsBuf[i] = nil + if si.nonIndexedLabelsBuf != nil { + for i := range si.nonIndexedLabelsBuf { + if si.nonIndexedLabelsBuf[i] != nil { + BytesBufferPool.Put(si.nonIndexedLabelsBuf[i]) + si.nonIndexedLabelsBuf[i] = nil } } - LabelsPool.Put(si.metaLabelsBuf) - si.metaLabelsBuf = nil + LabelsPool.Put(si.nonIndexedLabelsBuf) + si.nonIndexedLabelsBuf = nil } si.origBytes = nil @@ -1466,14 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...) + newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currNonIndexedLabels...) if !matches { continue } e.stats.AddPostFilterLines(1) e.currLabels = lbs - e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels) + e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels) e.cur.Timestamp = time.Unix(0, e.currTs) e.cur.Line = string(newLine) return true @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct { func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...) + val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currNonIndexedLabels...) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 1f3d162c7c7d3..d48d2753d0118 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -68,7 +68,7 @@ var ( chunkFormat: chunkFormatV3, }, { - headBlockFmt: UnorderedWithMetadataHeadBlockFmt, + headBlockFmt: UnorderedWithNonIndexedLabelsHeadBlockFmt, chunkFormat: chunkFormatV4, }, } @@ -165,7 +165,7 @@ func TestBlock(t *testing.T) { } for _, c := range cases { - require.NoError(t, chk.Append(logprotoEntryWithMetadata(c.ts, c.str, c.lbs))) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(c.ts, c.str, c.lbs))) if c.cut { require.NoError(t, chk.cut()) } @@ -1478,7 +1478,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { expect: true, }, { - desc: "entry fits with metadata", + desc: "entry fits with non-indexed labels", targetSize: 10, headSize: 0, cutBlockSize: 0, @@ -1503,7 +1503,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { expect: false, }, { - desc: "entry too big because metadata", + desc: "entry too big because non-indexed labels", targetSize: 10, headSize: 0, cutBlockSize: 0, @@ -1517,7 +1517,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { expectFunc: func(chunkFormat byte, _ HeadBlockFmt) bool { // Succeed unless we're using chunk format v4, which should - // take the metadata into account. + // take the non-indexed labels into account. return chunkFormat < chunkFormatV4 }, }, @@ -1553,21 +1553,21 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { streamLabels := labels.Labels{ {Name: "job", Value: "fake"}, } - chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize) - require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{ + chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(1, "lineA", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "a"}, }))) - require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{ + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(2, "lineB", []logproto.LabelAdapter{ {Name: "traceID", Value: "456"}, {Name: "user", Value: "b"}, }))) require.NoError(t, chk.cut()) - require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{ + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(3, "lineC", []logproto.LabelAdapter{ {Name: "traceID", Value: "789"}, {Name: "user", Value: "c"}, }))) - require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{ + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(4, "lineD", []logproto.LabelAdapter{ {Name: "traceID", Value: "123"}, {Name: "user", Value: "d"}, }))) @@ -1646,7 +1646,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { }, }, { - name: "metadata-and-keep", + name: "keep", query: `{job="fake"} | keep job, user`, expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, expectedStreams: []string{ @@ -1657,7 +1657,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { }, }, { - name: "metadata-and-keep-filter", + name: "keep-filter", query: `{job="fake"} | keep job, user="b"`, expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, expectedStreams: []string{ @@ -1668,7 +1668,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { }, }, { - name: "metadata-and-drop", + name: "drop", query: `{job="fake"} | drop traceID`, expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, expectedStreams: []string{ @@ -1679,7 +1679,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { }, }, { - name: "metadata-and-drop-filter", + name: "drop-filter", query: `{job="fake"} | drop traceID="123"`, expectedLines: []string{"lineA", "lineB", "lineC", "lineD"}, expectedStreams: []string{ diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index f65ec5843d9e4..da4887c891449 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -94,8 +94,8 @@ func (hb *unorderedHeadBlock) Reset() { } type nsEntry struct { - line string - metadataLabels labels.Labels + line string + nonIndexedLabels labels.Labels } // collection of entries belonging to the same nanosecond @@ -109,7 +109,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { } func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.Labels) error { - if hb.format < UnorderedWithMetadataHeadBlockFmt { + if hb.format < UnorderedWithNonIndexedLabelsHeadBlockFmt { // metaLabels must be ignored for the previous head block formats metaLabels = nil } @@ -211,16 +211,16 @@ func (hb *unorderedHeadBlock) forEntries( for ; i < len(es.entries) && i >= 0; next() { line := es.entries[i].line - metadataLabels := es.entries[i].metadataLabels + nonIndexedLabels := es.entries[i].nonIndexedLabels var nonIndexedLabelsBytes int64 - for _, label := range metadataLabels { + for _, label := range nonIndexedLabels { nonIndexedLabelsBytes += int64(len(label.Name) + len(label.Value)) } chunkStats.AddHeadChunkNonIndexedLabelsBytes(nonIndexedLabelsBytes) chunkStats.AddHeadChunkBytes(int64(len(line)) + nonIndexedLabelsBytes) - err = entryFn(chunkStats, es.ts, line, metadataLabels) + err = entryFn(chunkStats, es.ts, line, nonIndexedLabels) } } @@ -336,7 +336,6 @@ func (hb *unorderedHeadBlock) SampleIterator( Timestamp: ts, Value: value, Hash: xxhash.Sum64(unsafeGetBytes(line)), - // TODO: add metadata labels to sample }) return nil }, @@ -385,8 +384,8 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf.WriteString(line) - if hb.format >= UnorderedWithMetadataHeadBlockFmt { - // Serialize metadata labels + if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { + // Serialize non-indexed labels n = binary.PutUvarint(encBuf, uint64(len(metaLabels))) inBuf.Write(encBuf[:n]) for _, l := range metaLabels { @@ -437,7 +436,7 @@ func (hb *unorderedHeadBlock) CheckpointSize() int { size += binary.MaxVarintLen32 * 2 // total entries + total size size += binary.MaxVarintLen64 * 2 // mint,maxt size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line. - if hb.format >= UnorderedWithMetadataHeadBlockFmt { + if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { _ = hb.forEntries( context.Background(), logproto.FORWARD, @@ -506,8 +505,8 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { return errors.Wrap(err, "write headblock entry line") } - if hb.format >= UnorderedWithMetadataHeadBlockFmt { - // metadata + if hb.format >= UnorderedWithNonIndexedLabelsHeadBlockFmt { + // non-indexed labels eb.putUvarint(len(metaLabels)) _, err = w.Write(eb.get()) if err != nil { @@ -572,7 +571,7 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { line := string(db.bytes(lineLn)) var metaLabels labels.Labels - if version >= UnorderedWithMetadataHeadBlockFmt.Byte() { + if version >= UnorderedWithNonIndexedLabelsHeadBlockFmt.Byte() { metaLn := db.uvarint() if metaLn > 0 { metaLabels = make(labels.Labels, metaLn) @@ -614,7 +613,7 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { return nil, errors.Wrap(db.err(), "verifying headblock header") } format := HeadBlockFmt(version) - if format > UnorderedWithMetadataHeadBlockFmt { + if format > UnorderedWithNonIndexedLabelsHeadBlockFmt { return nil, fmt.Errorf("unexpected head block version: %v", format) } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 8a891761ca1c5..532039681bbf3 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -167,7 +167,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { for _, format := range []HeadBlockFmt{ UnorderedHeadBlockFmt, - UnorderedWithMetadataHeadBlockFmt, + UnorderedWithNonIndexedLabelsHeadBlockFmt, } { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format) @@ -185,7 +185,7 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { expected := make([]entry, len(tc.exp)) copy(expected, tc.exp) - if format < UnorderedWithMetadataHeadBlockFmt { + if format < UnorderedWithNonIndexedLabelsHeadBlockFmt { for i := range expected { expected[i].nonIndexedLabels = nil } @@ -244,7 +244,7 @@ func Test_UnorderedBoundedIter(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { for _, format := range []HeadBlockFmt{ UnorderedHeadBlockFmt, - UnorderedWithMetadataHeadBlockFmt, + UnorderedWithNonIndexedLabelsHeadBlockFmt, } { t.Run(format.String(), func(t *testing.T) { hb := newUnorderedHeadBlock(format) @@ -262,7 +262,7 @@ func Test_UnorderedBoundedIter(t *testing.T) { expected := make([]entry, len(tc.exp)) copy(expected, tc.exp) - if format < UnorderedWithMetadataHeadBlockFmt { + if format < UnorderedWithNonIndexedLabelsHeadBlockFmt { for i := range expected { expected[i].nonIndexedLabels = nil } @@ -277,11 +277,11 @@ func Test_UnorderedBoundedIter(t *testing.T) { func TestHeadBlockInterop(t *testing.T) { unordered, ordered := newUnorderedHeadBlock(UnorderedHeadBlockFmt), &headBlock{} - unorderedWithMetadata := newUnorderedHeadBlock(UnorderedWithMetadataHeadBlockFmt) + unorderedWithNonIndexedLabels := newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt) for i := 0; i < 100; i++ { metaLabels := labels.Labels{{Name: "foo", Value: fmt.Sprint(99 - i)}} require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) - require.Nil(t, unorderedWithMetadata.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) + require.Nil(t, unorderedWithNonIndexedLabels.Append(int64(99-i), fmt.Sprint(99-i), metaLabels)) require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i), labels.Labels{{Name: "foo", Value: fmt.Sprint(i)}})) } @@ -290,7 +290,7 @@ func TestHeadBlockInterop(t *testing.T) { require.Nil(t, err) unorderedCheckpointBytes, err := unordered.CheckpointBytes(nil) require.Nil(t, err) - unorderedWithMetadataCheckpointBytes, err := unorderedWithMetadata.CheckpointBytes(nil) + unorderedWithNonIndexedLabelsCheckpointBytes, err := unorderedWithNonIndexedLabels.CheckpointBytes(nil) require.Nil(t, err) // Ensure we can recover ordered checkpoint into ordered headblock @@ -303,11 +303,11 @@ func TestHeadBlockInterop(t *testing.T) { require.Nil(t, err) require.Equal(t, unordered, recovered) - // Ensure we can recover ordered checkpoint into unordered headblock with metadata - recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + // Ensure we can recover ordered checkpoint into unordered headblock with non-indexed labels + recovered, err = HeadFromCheckpoint(orderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) require.NoError(t, err) require.Equal(t, &unorderedHeadBlock{ - format: UnorderedWithMetadataHeadBlockFmt, + format: UnorderedWithNonIndexedLabelsHeadBlockFmt, rt: unordered.rt, lines: unordered.lines, size: unordered.size, @@ -325,11 +325,11 @@ func TestHeadBlockInterop(t *testing.T) { require.Nil(t, err) require.Equal(t, unordered, recovered) - // Ensure we can recover unordered checkpoint into unordered with metadata headblock - recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + // Ensure we can recover unordered checkpoint into unordered with non-indexed labels + recovered, err = HeadFromCheckpoint(unorderedCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) require.NoError(t, err) require.Equal(t, &unorderedHeadBlock{ - format: UnorderedWithMetadataHeadBlockFmt, + format: UnorderedWithNonIndexedLabelsHeadBlockFmt, rt: unordered.rt, lines: unordered.lines, size: unordered.size, @@ -337,20 +337,20 @@ func TestHeadBlockInterop(t *testing.T) { maxt: unordered.maxt, }, recovered) - // Ensure we can recover unordered with metadata checkpoint into ordered headblock - recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, OrderedHeadBlockFmt) + // Ensure we can recover unordered with non-indexed labels checkpoint into ordered headblock + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, OrderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, ordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. - // Ensure we can recover unordered with metadata checkpoint into unordered headblock - recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, UnorderedHeadBlockFmt) + // Ensure we can recover unordered with non-indexed labels checkpoint into unordered headblock + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedHeadBlockFmt) require.Nil(t, err) require.Equal(t, unordered, recovered) // we compare the data with unordered because unordered head block does not contain metaLabels. - // Ensure we can recover unordered with metadata checkpoint into unordered with metadata headblock - recovered, err = HeadFromCheckpoint(unorderedWithMetadataCheckpointBytes, UnorderedWithMetadataHeadBlockFmt) + // Ensure we can recover unordered with non-indexed labels checkpoint into unordered with non-indexed labels headblock + recovered, err = HeadFromCheckpoint(unorderedWithNonIndexedLabelsCheckpointBytes, UnorderedWithNonIndexedLabelsHeadBlockFmt) require.Nil(t, err) - require.Equal(t, unorderedWithMetadata, recovered) + require.Equal(t, unorderedWithNonIndexedLabels, recovered) } // ensure backwards compatibility from when chunk format @@ -723,9 +723,9 @@ func Test_HeadIteratorHash(t *testing.T) { } for name, b := range map[string]HeadBlock{ - "unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt), - "unordered with metadata": newUnorderedHeadBlock(UnorderedWithMetadataHeadBlockFmt), - "ordered": &headBlock{}, + "unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt), + "unordered with non-indexed labels": newUnorderedHeadBlock(UnorderedWithNonIndexedLabelsHeadBlockFmt), + "ordered": &headBlock{}, } { t.Run(name, func(t *testing.T) { require.NoError(t, b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}})) diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index bd667c89caa8a..43c96ae99a2c2 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -15,7 +15,7 @@ func logprotoEntry(ts int64, line string) *logproto.Entry { } } -func logprotoEntryWithMetadata(ts int64, line string, nonIndexedLabels []logproto.LabelAdapter) *logproto.Entry { +func logprotoEntryWithNonIndexedLabels(ts int64, line string, nonIndexedLabels []logproto.LabelAdapter) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, ts), Line: line, diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 8caab7fe683d6..9a26c045ae812 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -38,7 +38,7 @@ var ( nonIndexedLabelsBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", Name: "distributor_non_indexed_labels_bytes_received_total", - Help: "The total number of uncompressed bytes received per tenant for entries metadata (non-indexed labels)", + Help: "The total number of uncompressed bytes received per tenant for entries' non-indexed labels", }, []string{"tenant", "retention_hours"}) linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 4ba20f90676f3..ba278d3b26555 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -34,8 +34,8 @@ type SampleExtractor interface { // A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { BaseLabels() LabelsResult - Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) - ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) + Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) + ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) } type lineSampleExtractor struct { @@ -80,9 +80,9 @@ type streamLineSampleExtractor struct { builder *LabelsBuilder } -func (l *streamLineSampleExtractor) Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { l.builder.Reset() - l.builder.Add(metadataLabels...) + l.builder.Add(nonIndexedLabels...) // short circuit. if l.Stage == NoopStage { @@ -96,9 +96,9 @@ func (l *streamLineSampleExtractor) Process(ts int64, line []byte, metadataLabel return l.LineExtractor(line), l.builder.GroupedLabels(), true } -func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { +func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line), metadataLabels...) + return l.Process(ts, unsafeGetBytes(line), nonIndexedLabels...) } func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -171,10 +171,10 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { // Apply the pipeline first. l.builder.Reset() - l.builder.Add(metadataLabels...) + l.builder.Add(nonIndexedLabels...) line, ok := l.preStage.Process(ts, line, l.builder) if !ok { return 0, nil, false @@ -202,9 +202,9 @@ func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, metadataLabe return v, l.builder.GroupedLabels(), true } -func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, metadataLabels ...labels.Label) (float64, LabelsResult, bool) { +func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (float64, LabelsResult, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line), metadataLabels...) + return l.Process(ts, unsafeGetBytes(line), nonIndexedLabels...) } func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 778809a0e54c8..d3bcbd8e1ff6e 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -361,7 +361,7 @@ func TestNewLineSampleExtractor(t *testing.T) { require.False(t, ok) } -func TestNewLineSampleExtractorWithNonIndexedMetadata(t *testing.T) { +func TestNewLineSampleExtractorWithNonIndexedLabels(t *testing.T) { lbs := labels.FromStrings("foo", "bar") nonIndexedLabels := labels.FromStrings("user", "bob") expectedLabelsResults := append(lbs, nonIndexedLabels...)