Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect and serve pre-aggregated bytes and counts #13020

Merged
merged 47 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
dc620e7
feat: collect and serve pre-agg bytes and count
trevorwhitney May 8, 2024
f0d6a92
feat: reject filter queries to /patterns endpoint
trevorwhitney May 23, 2024
68aa188
feat: guard aggregation behavior behind a feature flag
trevorwhitney May 23, 2024
0bfd0ad
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 23, 2024
b897fc5
fix: ring proxy methods on pattern ring_client
trevorwhitney May 24, 2024
2587657
fix: grouping
trevorwhitney May 24, 2024
6dd77ae
feat: refactor metric samples to be it's own endpoint
trevorwhitney May 31, 2024
eb84303
chore: a bit of cleanup
trevorwhitney May 31, 2024
33ead60
feat: hook up samples endpoint
trevorwhitney May 31, 2024
abb31a8
Merge branch 'main' into sample-count-and-bytes
trevorwhitney May 31, 2024
87f7282
chore: clean up linting
trevorwhitney May 31, 2024
29febb7
chore: make format
trevorwhitney May 31, 2024
cbf9fc0
docs: update docs
trevorwhitney May 31, 2024
6ed195e
fix: nanosecond values in test with non-decimal seconds value
trevorwhitney Jun 3, 2024
7942e57
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
1822b88
fix: formatting
trevorwhitney Jun 3, 2024
35585db
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 3, 2024
81e27a4
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 4, 2024
3a01880
fix: move /explore/query_range case up
trevorwhitney Jun 4, 2024
30c5b90
chore: add metrics and debug logging
trevorwhitney Jun 5, 2024
d94349c
chore: add more debug logging to chunk iteration
trevorwhitney Jun 5, 2024
d3f760b
fix: use pointers for chunks and samples, add chunk locking
trevorwhitney Jun 5, 2024
a2c601e
feat: add sum merge sample iterator
trevorwhitney Jun 11, 2024
e3777bc
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 11, 2024
f1cfd81
test: read metric batch test
trevorwhitney Jun 11, 2024
f42f523
fix: formatting
trevorwhitney Jun 12, 2024
5c0abde
fix: more linting
trevorwhitney Jun 13, 2024
0f7e473
fix: check-mod
trevorwhitney Jun 13, 2024
6cdeeca
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
ac83fcb
fix: mod again
trevorwhitney Jun 13, 2024
e5e23c8
fix: chunk test
trevorwhitney Jun 13, 2024
1664017
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 13, 2024
721e7c1
fix: docs
trevorwhitney Jun 13, 2024
12b3636
fix: logger in sample evaluator tests
trevorwhitney Jun 14, 2024
1133763
chore: more debug logging
trevorwhitney Jun 17, 2024
d59dd4c
use sum merge sample iterator
trevorwhitney Jun 17, 2024
e4b5dd7
more debug logging
trevorwhitney Jun 17, 2024
426d143
more debug logging
trevorwhitney Jun 17, 2024
0c5a436
test: add test coverage around reading batches
trevorwhitney Jun 18, 2024
96cccef
various fixes for building series
cyriltovena Jun 18, 2024
2d13e8a
Removes comments
cyriltovena Jun 18, 2024
74c245d
fix: Prune chunks older than a specified duration and only delete str…
cyriltovena Jun 18, 2024
fefa6b5
test: more test coverage
trevorwhitney Jun 18, 2024
b255b80
Merge branch 'main' into sample-count-and-bytes
trevorwhitney Jun 18, 2024
796d8d9
fix: truncate timestamp to nearest step
trevorwhitney Jun 18, 2024
b2d72a6
fix: tests
trevorwhitney Jun 18, 2024
fa7016b
fix: lint
trevorwhitney Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: grouping
  • Loading branch information
trevorwhitney committed May 24, 2024
commit 258765772e7815d07849c70f8c774a0902079145
2 changes: 2 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ schema_config:

pattern_ingester:
enabled: true
metric_aggregation:
enabled: true

ruler:
alertmanager_url: http://localhost:9093
Expand Down
4 changes: 2 additions & 2 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger)
cfg.ClientConfig.RegisterFlags(fs)
cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")

fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")

cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -245,7 +245,7 @@
return err
}

expr, err := syntax.ParseExpr(req.Query)

Check failure on line 248 in pkg/pattern/ingester.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

ineffectual assignment to err (ineffassign)

switch e := expr.(type) {
case syntax.SampleExpr:
Expand Down
13 changes: 1 addition & 12 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,13 @@ func (i *instance) QuerySample(
return nil, err
}

typ, err := metric.ExtractMetricType(expr)
if err != nil || typ == metric.Unsupported {
return nil, err
}

var iters []iter.Iterator
err = i.forMatchingStreams(
selector.Matchers(),
func(stream *stream) error {
var iter iter.Iterator
var err error
if typ == metric.Bytes {
iter, err = stream.BytesIterator(ctx, expr, from, through, step)
} else if typ == metric.Count {
iter, err = stream.CountIterator(ctx, expr, from, through, step)
} else {
return fmt.Errorf("unsupported query operation")
}
iter, err = stream.SampleIterator(ctx, expr, from, through, step)

if err != nil {
return err
Expand Down
19 changes: 16 additions & 3 deletions pkg/pattern/metric/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/chunk"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

type MetricType int

Check warning on line 17 in pkg/pattern/metric/chunk.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as metric.MetricType by other packages, and that stutters; consider calling this Type (revive)

const (
Bytes MetricType = iota
Expand Down Expand Up @@ -49,14 +50,25 @@
}

func (c *Chunks) Iterator(
ctx context.Context,

Check warning on line 53 in pkg/pattern/metric/chunk.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
typ MetricType,
grouping *syntax.Grouping,
from, through, step model.Time,
) (iter.Iterator, error) {
if typ == Unsupported {
return nil, fmt.Errorf("unsupported metric type")
}

lbls := c.labels
if grouping != nil {
sort.Strings(grouping.Groups)
lbls = make(labels.Labels, 0, len(grouping.Groups))
for _, group := range grouping.Groups {
value := c.labels.Get(group)
lbls = append(lbls, labels.Label{Name: group, Value: value})
}
}

iters := make([]iter.Iterator, 0, len(c.chunks))
for _, chunk := range c.chunks {
samples, err := chunk.ForRangeAndType(typ, from, through, step)
Expand All @@ -68,13 +80,14 @@
continue
}

iters = append(iters, iter.NewLabelsSlice(c.labels, samples))
iters = append(iters, iter.NewLabelsSlice(lbls, samples))
}
return iter.NewNonOverlappingLabelsIterator(c.labels, iters), nil

return iter.NewNonOverlappingLabelsIterator(lbls, iters), nil
}

// TODO(twhitney): These values should be float64s (to match prometheus samples) or int64s (to match pattern samples)
type MetricSample struct {

Check warning on line 90 in pkg/pattern/metric/chunk.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as metric.MetricSample by other packages, and that stutters; consider calling this Sample (revive)
Timestamp model.Time
Bytes uint64
Count uint64
Expand All @@ -88,7 +101,7 @@
}
}

type MetricSamples []MetricSample

Check warning on line 104 in pkg/pattern/metric/chunk.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

exported: type name will be used as metric.MetricSamples by other packages, and that stutters; consider calling this Samples (revive)

type Chunk struct {
Samples MetricSamples
Expand Down Expand Up @@ -127,7 +140,7 @@
return ts.Sub(c.Samples[0].Timestamp) < chunk.MaxChunkTime
}

//TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method?
// TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method?
// ForRangeAndType returns samples with only the values
// in the given range [start:end] and aggregates them by step duration.
// start and end are in milliseconds since epoch. step is a duration in milliseconds.
Expand Down
110 changes: 110 additions & 0 deletions pkg/pattern/metric/chunk_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package metric

import (
"context"
"reflect"
"testing"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -327,3 +331,109 @@ func TestForRangeAndType(t *testing.T) {
})
}
}

func Test_Chunks_Iterator(t *testing.T) {
ctx := context.Background()
lbls := labels.Labels{
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "container", Value: "jar"},
}
chunks := Chunks{
chunks: []Chunk{
{
Samples: []MetricSample{
{Timestamp: 2, Bytes: 2, Count: 1},
{Timestamp: 4, Bytes: 4, Count: 3},
{Timestamp: 6, Bytes: 6, Count: 5},
},
mint: 2,
maxt: 6,
},
},
labels: lbls,
}

t.Run("without grouping", func(t *testing.T) {
it, err := chunks.Iterator(ctx, Bytes, nil, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, nil, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
})

t.Run("grouping", func(t *testing.T) {
grouping := &syntax.Grouping{
Groups: []string{"container"},
Without: false,
}

expectedLabels := labels.Labels{
labels.Label{
Name: "container",
Value: "jar",
},
}

it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())
})

t.Run("grouping by a missing label", func(t *testing.T) {
grouping := &syntax.Grouping{
Groups: []string{"missing"},
Without: false,
}

expectedLabels := labels.Labels{
labels.Label{
Name: "missing",
Value: "",
},
}

it, err := chunks.Iterator(ctx, Bytes, grouping, 0, 10, 2)
require.NoError(t, err)

res, err := iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())

it, err = chunks.Iterator(ctx, Count, grouping, 0, 10, 2)
require.NoError(t, err)

res, err = iter.ReadAllWithLabels(it)
require.NoError(t, err)

require.Equal(t, 1, len(res.Series))
require.Equal(t, expectedLabels.String(), res.Series[0].GetLabels())
})
}
22 changes: 12 additions & 10 deletions pkg/pattern/metric/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)

// TODO(twhitney): duplication with code in NewStepEvaluator
func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) {
func extractMetricType(expr syntax.SampleExpr) (MetricType, error) {
var typ MetricType
switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
Expand Down Expand Up @@ -57,7 +57,6 @@
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error)
}
Expand All @@ -66,18 +65,16 @@
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error)

func (s SampleEvaluatorFunc) NewStepEvaluator(
ctx context.Context,
nextEvaluatorFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
return s(ctx, nextEvaluatorFactory, expr, typ, from, through, step)
return s(ctx, nextEvaluatorFactory, expr, from, through, step)
}

type DefaultEvaluatorFactory struct {
Expand All @@ -94,9 +91,13 @@
ctx context.Context,
evFactory SampleEvaluatorFactory,
expr syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
metricType, err := extractMetricType(expr)
if err != nil || metricType == Unsupported {
return nil, err
}

switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
if rangExpr, ok := e.Left.(*syntax.RangeAggregationExpr); ok && e.Operation == syntax.OpTypeSum {
Expand All @@ -106,12 +107,11 @@
func(ctx context.Context,
_ SampleEvaluatorFactory,
_ syntax.SampleExpr,
typ MetricType,
from, through, step model.Time,
) (logql.StepEvaluator, error) {
fromWithRangeAndOffset := from.Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset)
throughWithOffset := through.Add(-rangExpr.Left.Offset)
it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step)
it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +129,7 @@
if e.Grouping == nil {
return nil, errors.Errorf("aggregation operator '%q' without grouping", e.Operation)
}
nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, typ, from, through, step)
nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, from, through, step)
if err != nil {
return nil, err
}
Expand All @@ -145,7 +145,7 @@
case *syntax.RangeAggregationExpr:
fromWithRangeAndOffset := from.Add(-e.Left.Interval).Add(-e.Left.Offset)
throughWithOffset := through.Add(-e.Left.Offset)
it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step)
it, err := ev.chunks.Iterator(ctx, metricType, e.Grouping, fromWithRangeAndOffset, throughWithOffset, step)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -186,7 +186,7 @@

func newRangeVectorIterator(
it loki_iter.PeekingSampleIterator,
expr *syntax.RangeAggregationExpr,

Check warning on line 189 in pkg/pattern/metric/evaluator.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'expr' seems to be unused, consider removing or renaming it as _ (revive)
selRange, step, start, end, offset int64,
) (logql.RangeVectorIterator, error) {
// forces at least one step.
Expand Down Expand Up @@ -250,6 +250,8 @@
lbls labels.Labels
}

// TODO: could this me a matrix iterator that returned multiple samples with
// different labels for the same timestamp?
func NewSeriesToSampleIterator(series *promql.Series) *SeriesToSampleIterator {
return &SeriesToSampleIterator{
floats: series.Floats,
Expand Down Expand Up @@ -304,7 +306,7 @@
func NewParams(
expr syntax.SampleExpr,
from, through, step model.Time,
) *paramCompat {

Check warning on line 309 in pkg/pattern/metric/evaluator.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unexported-return: exported func NewParams returns unexported type *metric.paramCompat, which can be annoying to use (revive)
return &paramCompat{
expr: expr,
from: from,
Expand Down
4 changes: 0 additions & 4 deletions pkg/pattern/metric/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@
expr, err := syntax.ParseSampleExpr(query)
require.NoError(t, err)

typ, err := ExtractMetricType(expr)
require.NoError(t, err)

evaluator, err := factory.NewStepEvaluator(
context.Background(),
factory,
expr.(syntax.SampleExpr),

Check failure on line 36 in pkg/pattern/metric/evaluator_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

S1040: type assertion to the same type: expr already has type syntax.SampleExpr (gosimple)
typ,
model.Time(now-fiveMin), model.Time(now), model.Time(fiveMin),
)

Expand Down
Loading
Loading