Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make naming consistent for non-Indexed labels #10024

Merged
merged 5 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) erro
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}

func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
func (c *Client) PushLogLineWithNonIndexedLabels(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndNonIndexedLabels(line, c.Now, logLabels, extraLabels...)
}

// PushLogLineWithTimestamp creates a new logline at the given timestamp
Expand All @@ -100,7 +100,7 @@ func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extr
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}

func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
func (c *Client) PushLogLineWithTimestampAndNonIndexedLabels(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
Expand Down
8 changes: 4 additions & 4 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,12 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithNonIndexedLabels("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))

})

Expand Down
90 changes: 45 additions & 45 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
defaultBlockSize = 256 * 1024
)

var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithMetadataHeadBlockFmt}
var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithNonIndexedLabelsHeadBlockFmt}

type HeadBlockFmt byte

Expand All @@ -56,8 +56,8 @@ func (f HeadBlockFmt) String() string {
return "ordered"
case f == UnorderedHeadBlockFmt:
return "unordered"
case f == UnorderedWithMetadataHeadBlockFmt:
return "unordered with metadata"
case f == UnorderedWithNonIndexedLabelsHeadBlockFmt:
return "unordered with non-indexed labels"
default:
return fmt.Sprintf("unknown: %v", byte(f))
}
Expand All @@ -80,7 +80,7 @@ const (
_
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
UnorderedWithMetadataHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedHeadBlockFmt
)
Expand Down Expand Up @@ -1144,8 +1144,8 @@ type bufferedIterator struct {
currLine []byte // the current line, this is the same as the buffer but sliced the line size.
currTs int64

metaLabelsBuf [][]byte // The buffer for a single entry's metadata labels.
currMetadataLabels labels.Labels // The current labels.
nonIndexedLabelsBuf [][]byte // The buffer for a single entry's non-indexed labels.
currNonIndexedLabels labels.Labels // The current labels.

closed bool
}
Expand Down Expand Up @@ -1199,14 +1199,14 @@ func (si *bufferedIterator) Next() bool {

si.currTs = ts
si.currLine = line
si.currMetadataLabels = nonIndexedLabels
si.currNonIndexedLabels = nonIndexedLabels
return true
}

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
var decompressedBytes int64
var decompressedMetadataBytes int64
var decompressedNonIndexedLabelsBytes int64
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
Expand Down Expand Up @@ -1307,33 +1307,33 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}

// Number of labels
decompressedMetadataBytes += binary.MaxVarintLen64
decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64

// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid])

// If not enough space for the labels, create a new buffer slice and put the old one back in the pool.
metaLabelsBufLen := nLabels * 2
if si.metaLabelsBuf == nil || metaLabelsBufLen > cap(si.metaLabelsBuf) {
if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
nonIndexedLabelsBufLen := nLabels * 2
if si.nonIndexedLabelsBuf == nil || nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) {
if si.nonIndexedLabelsBuf != nil {
for i := range si.nonIndexedLabelsBuf {
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
}
}
LabelsPool.Put(si.metaLabelsBuf)
LabelsPool.Put(si.nonIndexedLabelsBuf)
}
si.metaLabelsBuf = LabelsPool.Get(metaLabelsBufLen).([][]byte)
if metaLabelsBufLen > cap(si.metaLabelsBuf) {
si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", metaLabelsBufLen, cap(si.metaLabelsBuf))
si.nonIndexedLabelsBuf = LabelsPool.Get(nonIndexedLabelsBufLen).([][]byte)
if nonIndexedLabelsBufLen > cap(si.nonIndexedLabelsBuf) {
si.err = fmt.Errorf("could not get a labels matrix of size %d, actual %d", nonIndexedLabelsBufLen, cap(si.nonIndexedLabelsBuf))
return 0, nil, nil, false
}
}

si.metaLabelsBuf = si.metaLabelsBuf[:nLabels*2]
si.nonIndexedLabelsBuf = si.nonIndexedLabelsBuf[:nLabels*2]

// Read all the label-value pairs, into the buffer slice.
for i := 0; i < metaLabelsBufLen; i++ {
for i := 0; i < nonIndexedLabelsBufLen; i++ {
// Read the length of the label.
lastAttempt = 0
var labelWidth, labelSize int
Expand All @@ -1360,30 +1360,30 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}

// Label size
decompressedMetadataBytes += binary.MaxVarintLen64
decompressedNonIndexedLabelsBytes += binary.MaxVarintLen64

// If the buffer is not yet initialize or too small, we get a new one.
if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) {
if si.nonIndexedLabelsBuf[i] == nil || labelSize > cap(si.nonIndexedLabelsBuf[i]) {
// in case of a replacement we replace back the buffer in the pool
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
}
si.metaLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte)
if labelSize > cap(si.metaLabelsBuf[i]) {
si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.metaLabelsBuf[i]))
si.nonIndexedLabelsBuf[i] = BytesBufferPool.Get(labelSize).([]byte)
if labelSize > cap(si.nonIndexedLabelsBuf[i]) {
si.err = fmt.Errorf("could not get a label buffer of size %d, actual %d", labelSize, cap(si.nonIndexedLabelsBuf[i]))
return 0, nil, nil, false
}
}

si.metaLabelsBuf[i] = si.metaLabelsBuf[i][:labelSize]
si.nonIndexedLabelsBuf[i] = si.nonIndexedLabelsBuf[i][:labelSize]
// Take however many bytes are left in the read buffer.
n := copy(si.metaLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid])
n := copy(si.nonIndexedLabelsBuf[i], si.readBuf[labelWidth:si.readBufValid])
// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelWidth+n:si.readBufValid])

// Then process reading the label.
for n < labelSize {
r, err := si.reader.Read(si.metaLabelsBuf[i][n:labelSize])
r, err := si.reader.Read(si.nonIndexedLabelsBuf[i][n:labelSize])
n += r
if err != nil {
// We might get EOF after reading enough bytes to fill the buffer, which is OK.
Expand All @@ -1396,14 +1396,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}
}

decompressedMetadataBytes += int64(labelSize)
decompressedNonIndexedLabelsBytes += int64(labelSize)
}

si.stats.AddDecompressedLines(1)
si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedMetadataBytes)
si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedNonIndexedLabelsBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedNonIndexedLabelsBytes)

return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true
return ts, si.buf[:lineSize], si.nonIndexedLabelsBuf[:nonIndexedLabelsBufLen], true
}

func (si *bufferedIterator) Error() error { return si.err }
Expand All @@ -1427,15 +1427,15 @@ func (si *bufferedIterator) close() {
si.buf = nil
}

if si.metaLabelsBuf != nil {
for i := range si.metaLabelsBuf {
if si.metaLabelsBuf[i] != nil {
BytesBufferPool.Put(si.metaLabelsBuf[i])
si.metaLabelsBuf[i] = nil
if si.nonIndexedLabelsBuf != nil {
for i := range si.nonIndexedLabelsBuf {
if si.nonIndexedLabelsBuf[i] != nil {
BytesBufferPool.Put(si.nonIndexedLabelsBuf[i])
si.nonIndexedLabelsBuf[i] = nil
}
}
LabelsPool.Put(si.metaLabelsBuf)
si.metaLabelsBuf = nil
LabelsPool.Put(si.nonIndexedLabelsBuf)
si.nonIndexedLabelsBuf = nil
}

si.origBytes = nil
Expand Down Expand Up @@ -1466,14 +1466,14 @@ func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabe

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currMetadataLabels...)
newLine, lbs, matches := e.pipeline.Process(e.currTs, e.currLine, e.currNonIndexedLabels...)
if !matches {
continue
}

e.stats.AddPostFilterLines(1)
e.currLabels = lbs
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currMetadataLabels)
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels)
e.cur.Timestamp = time.Unix(0, e.currTs)
e.cur.Line = string(newLine)
return true
Expand All @@ -1500,7 +1500,7 @@ type sampleBufferedIterator struct {

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currMetadataLabels...)
val, labels, ok := e.extractor.Process(e.currTs, e.currLine, e.currNonIndexedLabels...)
if !ok {
continue
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var (
chunkFormat: chunkFormatV3,
},
{
headBlockFmt: UnorderedWithMetadataHeadBlockFmt,
headBlockFmt: UnorderedWithNonIndexedLabelsHeadBlockFmt,
chunkFormat: chunkFormatV4,
},
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestBlock(t *testing.T) {
}

for _, c := range cases {
require.NoError(t, chk.Append(logprotoEntryWithMetadata(c.ts, c.str, c.lbs)))
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(c.ts, c.str, c.lbs)))
if c.cut {
require.NoError(t, chk.cut())
}
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
expect: true,
},
{
desc: "entry fits with metadata",
desc: "entry fits with non-indexed labels",
targetSize: 10,
headSize: 0,
cutBlockSize: 0,
Expand All @@ -1503,7 +1503,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
expect: false,
},
{
desc: "entry too big because metadata",
desc: "entry too big because non-indexed labels",
targetSize: 10,
headSize: 0,
cutBlockSize: 0,
Expand All @@ -1517,7 +1517,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {

expectFunc: func(chunkFormat byte, _ HeadBlockFmt) bool {
// Succeed unless we're using chunk format v4, which should
// take the metadata into account.
// take the non-indexed labels into account.
return chunkFormat < chunkFormatV4
},
},
Expand Down Expand Up @@ -1553,21 +1553,21 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
streamLabels := labels.Labels{
{Name: "job", Value: "fake"},
}
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithMetadataHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithMetadata(1, "lineA", []logproto.LabelAdapter{
chk := newMemChunkWithFormat(chunkFormatV4, enc, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(1, "lineA", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "a"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(2, "lineB", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(2, "lineB", []logproto.LabelAdapter{
{Name: "traceID", Value: "456"},
{Name: "user", Value: "b"},
})))
require.NoError(t, chk.cut())
require.NoError(t, chk.Append(logprotoEntryWithMetadata(3, "lineC", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(3, "lineC", []logproto.LabelAdapter{
{Name: "traceID", Value: "789"},
{Name: "user", Value: "c"},
})))
require.NoError(t, chk.Append(logprotoEntryWithMetadata(4, "lineD", []logproto.LabelAdapter{
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(4, "lineD", []logproto.LabelAdapter{
{Name: "traceID", Value: "123"},
{Name: "user", Value: "d"},
})))
Expand Down Expand Up @@ -1646,7 +1646,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-keep",
name: "keep",
query: `{job="fake"} | keep job, user`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1657,7 +1657,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-keep-filter",
name: "keep-filter",
query: `{job="fake"} | keep job, user="b"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1668,7 +1668,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-drop",
name: "drop",
query: `{job="fake"} | drop traceID`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand All @@ -1679,7 +1679,7 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
},
},
{
name: "metadata-and-drop-filter",
name: "drop-filter",
query: `{job="fake"} | drop traceID="123"`,
expectedLines: []string{"lineA", "lineB", "lineC", "lineD"},
expectedStreams: []string{
Expand Down
Loading