Skip to content

Commit

Permalink
Make naming consistent for non-Indexed labels (#10024)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

We started using _metadata_ to refer to _non-indexed labels_. This PR
replaces all the _metadata_ usages and renames them to _non-indexed
labels_.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
salvacorts authored Jul 24, 2023
1 parent b5308bb commit a0e9f1a
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 116 deletions.
6 changes: 3 additions & 3 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) erro
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}

func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
func (c *Client) PushLogLineWithNonIndexedLabels(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndNonIndexedLabels(line, c.Now, logLabels, extraLabels...)
}

// PushLogLineWithTimestamp creates a new logline at the given timestamp
Expand All @@ -100,7 +100,7 @@ func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extr
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}

func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
func (c *Client) PushLogLineWithTimestampAndNonIndexedLabels(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
Expand Down
8 changes: 4 additions & 4 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,12 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))

})

Expand Down
90 changes: 45 additions & 45 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
defaultBlockSize = 256 * 1024
)

var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithMetadataHeadBlockFmt}
var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithNonIndexedLabelsHeadBlockFmt}

type HeadBlockFmt byte

Expand All @@ -56,8 +56,8 @@ func (f HeadBlockFmt) String() string {
return "ordered"
case f == UnorderedHeadBlockFmt:
return "unordered"
case f == UnorderedWithMetadataHeadBlockFmt:
return "unordered with metadata"
case f == UnorderedWithNonIndexedLabelsHeadBlockFmt:
return "unordered with non-indexed labels"
default:
return fmt.Sprintf("unknown: %v", byte(f))
}
Expand All @@ -80,7 +80,7 @@ const (
_
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
UnorderedWithMetadataHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedHeadBlockFmt
)
Expand Down Expand Up @@ -1144,8 +1144,8 @@ type bufferedIterator struct {
currLine []byte // the current line, this is the same as the buffer but sliced the line size.
currTs int64

metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels labels.Labels // The current labels.
nonIndexedLabelsBuf [][]byte // The buffer for a single entry's non-indexed labels.
currNonIndexedLabels labels.Labels // The current labels.

closed bool
}
Expand Down Expand Up @@ -1199,14 +1199,14 @@ func (si *bufferedIterator) Next() bool {

si.currTs = ts
si.currLine = line
si.currMetadataLabels = nonIndexedLabels
si.currNonIndexedLabels = nonIndexedLabels
return true
}

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
var decompressedBytes int64
var decompressedMetadataBytes int64
var decompressedNonIndexedLabelsBytes int64
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
Expand Down Expand Up @@ -1307,33 +1307,33 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}

// Number of labels
decompressedMetadataBytes += binary.MaxVarintLen64
decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64

// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid])

// If not enough space for the labels, create a new buffer slice and put the old one back in the pool.
metaLabelsBufLen := nLabels * 2
if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
nonIndexedLabelsBufLen := nLabels * 2
if si.nonIndexedLabelsBuf == nil || nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) {
if si.nonIndexedLabelsBuf != nil {
for i := range si.nonIndexedLabelsBuf {
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
}
}
LabelsPool.Put(si.metaLabelsBuf)
LabelsPool.Put(si.nonIndexedLabelsBuf)
}
si.metaLabelsBuf = LabelsPool.Get(metaLabelsBufLen).([][]byte)
if metaLabelsBufLen > cap(si.metaLabelsBuf) {
si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", metaLabelsBufLen, cap(si.metaLabelsBuf))
si.nonIndexedLabelsBuf = LabelsPool.Get(nonIndexedLabelsBufLen).([][]byte)
if nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) {
si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", nonIndexedLabelsBufLen, cap(si.nonIndexedLabelsBuf))
return 0, nil, nil, false
}
}

si.metaLabelsBuf = si.metaLabelsBuf[:nLabels*2]
si.nonIndexedLabelsBuf = si.nonIndexedLabelsBuf[:nLabels*2]

// Read all the label-value pairs, into the buffer slice.
for i := 0; i < metaLabelsBufLen; i++ {
for i := 0; i < nonIndexedLabelsBufLen; i++ {
// Read the length of the label.
lastAttempt = 0
var labelWidth, labelSize int
Expand All @@ -1360,30 +1360,30 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}

// Label size
decompressedMetadataBytes += binary.MaxVarintLen64
decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64

// If the buffer is not yet initialize or too small, we get a new one.
if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) {
if si.nonIndexedLabelsBuf[i] == nil || labelSize > cap(si.nonIndexedLabelsBuf[i]) {
// in case of a replacement we replace back the buffer in the pool
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
}
si.metaLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte)
if labelSize > cap(si.metaLabelsBuf[i]) {
si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.metaLabelsBuf[i]))
si.nonIndexedLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte)
if labelSize > cap(si.nonIndexedLabelsBuf[i]) {
si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.nonIndexedLabelsBuf[i]))
return 0, nil, nil, false
}
}

si.metaLabelsBuf[i] = si.metaLabelsBuf[i][:labelSize]
si.nonIndexedLabelsBuf[i] = si.nonIndexedLabelsBuf[i][:labelSize]
// Take however many bytes are left in the read buffer.
n := copy(si.metaLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid])
n := copy(si.nonIndexedLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid])
// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelWidth+n:si.readBufValid])

// Then process reading the label.
for n < labelSize {
r, err := si.reader.Read(si.metaLabelsBuf[i][n:labelSize])
r, err := si.reader.Read(si.nonIndexedLabelsBuf[i][n:labelSize])
n += r
if err != nil {
// We might get EOF after reading enough bytes to fill the buffer, which is OK.
Expand All @@ -1396,14 +1396,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}
}

decompressedMetadataBytes += int64(labelSize)
decompressedNonIndexedLabelsBytes += int64(labelSize)
}

si.stats.AddDecompressedLines(1)
si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedMetadataBytes)
si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedNonIndexedLabelsBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedNonIndexedLabelsBytes)

return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true
return ts, si.buf[:lineSize], si.nonIndexedLabelsBuf[:nonIndexedLabelsBufLen], true
}

func (si *bufferedIterator) Error() error { return si.err }
Expand All @@ -1427,15 +1427,15 @@ func (si *bufferedIterator) close() {
si.buf = nil
}

if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
si.metaLabelsBuf[i] = nil
if si.nonIndexedLabelsBuf != nil {
for i := range si.nonIndexedLabelsBuf {
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
si.nonIndexedLabelsBuf[i] = nil
}
}
LabelsPool.Put(si.metaLabelsBuf)
si.metaLabelsBuf = nil
LabelsPool.Put(si.nonIndexedLabelsBuf)
si.nonIndexedLabelsBuf = nil
}

si.origBytes = nil
Expand Down Expand Up @@ -1466,14 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...)
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currNonIndexedLabels...)
if !matches {
continue
}

e.stats.AddPostFilterLines(1)
e.currLabels = lbs
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels)
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels)
e.cur.Timestamp = time.Unix(0, e.currTs)
e.cur.Line = string(newLine)
return true
Expand All @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currNonIndexedLabels...)
if !ok {
continue
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var (
chunkFormat: chunkFormatV3,
},
{
headBlockFmt: UnorderedWithMetadataHeadBlockFmt,
headBlockFmt: UnorderedWithNonIndexedLabelsHeadBlockFmt,
chunkFormat: chunkFormatV4,
},
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestBlock(t *testing.T) {
}

for _, c := range cases {
require.NoError(t, chk.Append(logprotoEntryWithMetadata(c.ts, c.str, c.lbs)))
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(c.ts, c.str, c.lbs)))
if c.cut {
require.NoError(t, chk.cut())
}
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
expect: true,
},
{
desc: "entry fits with metadata",
desc: "entry fits with non-indexed labels",
targetSize: 10,
headSize: 0,
cutBlockSize: 0,
Expand All @@ -1503,7 +1503,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
expect: false,
},
{
desc: "entry too big because metadata",
desc: "entry too big because non-indexed labels",
targetSize: 10,
headSize: 0,
cutBlockSize: 0,
Expand All @@ -1517,7 +1517,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {

expectFunc: func(chunkFormat byte, _ HeadBlockFmt) bool {
// Succeed unless we're using chunk format v4, which should
// take the metadata into account.
// take the non-indexed labels into account.
return chunkFormat < chunkFormatV4
},
},
Expand Down Expand Up @@ -1553,21 +1553,21 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
streamLabels := labels.Labels{
{Name: "job", Value: "fake"},
}
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "a"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(2, "lineB", []logproto.LabelAdapter{
{Name: "traceID", Value: "456"},
{Name: "user", Value: "b"},
})))
require.NoError(t, chk.cut())
require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(3, "lineC", []logproto.LabelAdapter{
{Name: "traceID", Value: "789"},
{Name: "user", Value: "c"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(4, "lineD", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "d"},
})))
Expand Down Expand Up @@ -1646,7 +1646,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-keep",
name: "keep",
query: `{job="fake"} | keep job, user`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1657,7 +1657,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-keep-filter",
name: "keep-filter",
query: `{job="fake"} | keep job, user="b"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1668,7 +1668,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-drop",
name: "drop",
query: `{job="fake"} | drop traceID`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1679,7 +1679,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-drop-filter",
name: "drop-filter",
query: `{job="fake"} | drop traceID="123"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand Down
Loading

0 comments on commit a0e9f1a

Please sign in to comment.