From d09d4e3adabb0df3a87a952f6480d3a78fae4344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= <pstibrany@gmail.com> Date: Sat, 27 Feb 2021 21:40:18 +0100 Subject: [PATCH 01/12] Initial support for streaming chunks from v2 ingester. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/chunk/encoding/factory.go | 8 ++ pkg/chunk/encoding/prometheus_chunk.go | 124 +++++++++++++++++++++++++ pkg/ingester/ingester_v2.go | 50 +++++++--- pkg/ingester/ingester_v2_test.go | 4 +- pkg/querier/distributor_queryable.go | 3 +- 5 files changed, 174 insertions(+), 15 deletions(-) create mode 100644 pkg/chunk/encoding/prometheus_chunk.go diff --git a/pkg/chunk/encoding/factory.go b/pkg/chunk/encoding/factory.go index adf3a0b182..90a6a53302 100644 --- a/pkg/chunk/encoding/factory.go +++ b/pkg/chunk/encoding/factory.go @@ -52,6 +52,8 @@ const ( Varbit // Bigchunk encoding Bigchunk + // Read-only wrapper around Prometheus XOR-encoded chunk. + PrometheusXorChunk ) type encoding struct { @@ -78,6 +80,12 @@ var encodings = map[Encoding]encoding{ return newBigchunk() }, }, + PrometheusXorChunk: { + Name: "PrometheusXorChunk", + New: func() Chunk { + return newPrometheusXorChunk() + }, + }, } // Set implements flag.Value. diff --git a/pkg/chunk/encoding/prometheus_chunk.go b/pkg/chunk/encoding/prometheus_chunk.go new file mode 100644 index 0000000000..903ab680b0 --- /dev/null +++ b/pkg/chunk/encoding/prometheus_chunk.go @@ -0,0 +1,124 @@ +package encoding + +import ( + "io" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// Read-only wrapper around Prometheus chunk. +type prometheusXorChunk struct { + chunk chunkenc.Chunk +} + +func newPrometheusXorChunk() *prometheusXorChunk { + return &prometheusXorChunk{} +} + +func (p *prometheusXorChunk) Add(_ model.SamplePair) (Chunk, error) { + return nil, errors.New("cannot add new samples to Prometheus chunk") +} + +func (p *prometheusXorChunk) NewIterator(iterator Iterator) Iterator { + if p.chunk == nil { + // TODO: return error iterator + return nil + } + + if pit, ok := iterator.(*prometheusChunkIterator); ok { + pit.it = p.chunk.Iterator(pit.it) + return pit + } + + return &prometheusChunkIterator{p.chunk.Iterator(nil)} +} + +func (p *prometheusXorChunk) Marshal(i io.Writer) error { + if p.chunk == nil { + return errors.New("chunk data not set") + } + _, err := i.Write(p.chunk.Bytes()) + return err +} + +func (p *prometheusXorChunk) UnmarshalFromBuf(bytes []byte) error { + c, err := chunkenc.FromData(chunkenc.EncXOR, bytes) + if err != nil { + return errors.Wrap(err, "failed to create Prometheus chunk from bytes") + } + + p.chunk = c + return nil +} + +func (p *prometheusXorChunk) Encoding() Encoding { + return PrometheusXorChunk +} + +func (p *prometheusXorChunk) Utilization() float64 { + // Used for reporting when chunk is used to store new data. + return 0 +} + +func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk { + // Not implemented. + return p +} + +func (p *prometheusXorChunk) Rebound(_, _ model.Time) (Chunk, error) { + return nil, errors.New("rebound not supported by Prometheus chunk") +} + +func (p *prometheusXorChunk) Len() int { + return p.Size() +} + +func (p *prometheusXorChunk) Size() int { + if p.chunk == nil { + return 0 + } + return len(p.chunk.Bytes()) +} + +type prometheusChunkIterator struct { + it chunkenc.Iterator +} + +func (p *prometheusChunkIterator) Scan() bool { + return p.it.Next() +} + +func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool { + return p.it.Seek(int64(time)) +} + +func (p *prometheusChunkIterator) Value() model.SamplePair { + ts, val := p.it.At() + return model.SamplePair{ + Timestamp: model.Time(ts), + Value: model.SampleValue(val), + } +} + +func (p *prometheusChunkIterator) Batch(size int) Batch { + var batch Batch + j := 0 + for j < size { + t, v := p.it.At() + batch.Timestamps[j] = t + batch.Values[j] = v + j++ + if j < size && !p.it.Next() { + break + } + } + batch.Index = 0 + batch.Length = j + return batch +} + +func (p *prometheusChunkIterator) Err() error { + return p.it.Err() +} diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index dcc64e5747..e836f81527 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" @@ -28,6 +29,8 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -130,6 +133,10 @@ func (u *userTSDB) Querier(ctx context.Context, mint, maxt int64) (storage.Queri return u.db.Querier(ctx, mint, maxt) } +func (u *userTSDB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { + return u.db.ChunkQuerier(ctx, mint, maxt) +} + func (u *userTSDB) Head() *tsdb.Head { return u.db.Head() } @@ -1128,7 +1135,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste return nil } - q, err := db.Querier(ctx, int64(from), int64(through)) + q, err := db.ChunkQuerier(ctx, int64(from), int64(through)) if err != nil { return err } @@ -1140,7 +1147,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste return ss.Err() } - timeseries := make([]client.TimeSeries, 0, queryStreamBatchSize) + chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) batchSizeBytes := 0 numSamples := 0 numSeries := 0 @@ -1148,34 +1155,53 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste series := ss.At() // convert labels to LabelAdapter - ts := client.TimeSeries{ - Labels: client.FromLabelsToLabelAdapters(series.Labels()), + ts := client.TimeSeriesChunk{ + Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()), } it := series.Iterator() for it.Next() { - t, v := it.At() - ts.Samples = append(ts.Samples, client.Sample{Value: v, TimestampMs: t}) + // Chunks are ordered by min time. + meta := it.At() + + // It is not guaranteed that chunk returned by iterator is populated. + // For now just return error. We could also try to figure out how to read the chunk. + if meta.Chunk == nil { + return errors.Errorf("unfilled chunk returned from TSDB chunk querier") + } + + ch := client.Chunk{ + StartTimestampMs: meta.MinTime, + EndTimestampMs: meta.MaxTime, + Data: meta.Chunk.Bytes(), + } + switch meta.Chunk.Encoding() { + case chunkenc.EncXOR: + ch.Encoding = int32(encoding.PrometheusXorChunk) + default: + return errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding()) + } + ts.Chunks = append(ts.Chunks, ch) + numSamples += meta.Chunk.NumSamples() } - numSamples += len(ts.Samples) numSeries++ tsSize := ts.Size() - if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize { + if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(chunkSeries) >= queryStreamBatchSize { // Adding this series to the batch would make it too big, // flush the data and add it to new batch instead. err = client.SendQueryStream(stream, &client.QueryStreamResponse{ - Timeseries: timeseries, + Chunkseries: chunkSeries, }) if err != nil { return err } batchSizeBytes = 0 - timeseries = timeseries[:0] + chunkSeries = chunkSeries[:0] } - timeseries = append(timeseries, ts) + chunkSeries = append(chunkSeries, ts) batchSizeBytes += tsSize } @@ -1187,7 +1213,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste // Final flush any existing metrics if batchSizeBytes != 0 { err = client.SendQueryStream(stream, &client.QueryStreamResponse{ - Timeseries: timeseries, + Chunkseries: chunkSeries, }) if err != nil { return err diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index a6aa215642..1f7936f74e 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1336,7 +1336,7 @@ func TestIngester_v2QueryStream(t *testing.T) { break } require.NoError(t, err) - count += len(resp.Timeseries) + count += len(resp.Chunkseries) lastResp = resp } require.Equal(t, 1, count) @@ -1419,7 +1419,7 @@ func TestIngester_v2QueryStreamManySamples(t *testing.T) { break } require.NoError(t, err) - require.True(t, len(resp.Timeseries) > 0) // No empty messages. + require.True(t, len(resp.Chunkseries) > 0) // No empty messages. recvMsgs++ series += len(resp.Timeseries) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 8462ac674e..5114b208e5 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/cortexproject/cortex/pkg/querier/series" @@ -152,7 +153,7 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int continue } - ls := client.FromLabelAdaptersToLabels(result.Labels) + ls := cortexpb.FromLabelAdaptersToLabels(result.Labels) sort.Sort(ls) chunks, err := chunkcompat.FromChunks(userID, ls, result.Chunks) From 3fc9cf7495bf1e843ebb10a281db3043bd45b71d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 16:29:17 +0100 Subject: [PATCH 02/12] Fix bug in PrometheusXorChunk.FindAtOrAfter method, and include it in the tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- .../querier_streaming_mixed_ingester_test.go | 38 +++++----- pkg/chunk/encoding/chunk_test.go | 3 +- pkg/chunk/encoding/prometheus_chunk.go | 70 ++++++++++++++++--- pkg/querier/batch/batch_test.go | 2 + pkg/querier/batch/chunk_test.go | 2 +- pkg/querier/querier_test.go | 1 + 6 files changed, 85 insertions(+), 31 deletions(-) diff --git a/integration/querier_streaming_mixed_ingester_test.go b/integration/querier_streaming_mixed_ingester_test.go index ee347171c5..820f777775 100644 --- a/integration/querier_streaming_mixed_ingester_test.go +++ b/integration/querier_streaming_mixed_ingester_test.go @@ -107,7 +107,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { require.NoError(t, err) // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). - result, err := c.Query("s[1m]", time.Unix(10, 0)) + result, err := c.Query("s{l=\"3\"}[1m]", time.Unix(10, 0)) require.NoError(t, err) s1Values := []model.SamplePair{ @@ -118,22 +118,22 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { {Value: 5, Timestamp: 5000}, } - s1AndS2ValuesMerged := []model.SamplePair{ - {Value: 1, Timestamp: 1000}, - {Value: 2, Timestamp: 2000}, - {Value: 2.5, Timestamp: 2500}, - {Value: 3, Timestamp: 3000}, - {Value: 4, Timestamp: 4000}, - {Value: 5, Timestamp: 5000}, - {Value: 5.5, Timestamp: 5500}, - } + //s1AndS2ValuesMerged := []model.SamplePair{ + // {Value: 1, Timestamp: 1000}, + // {Value: 2, Timestamp: 2000}, + // {Value: 2.5, Timestamp: 2500}, + // {Value: 3, Timestamp: 3000}, + // {Value: 4, Timestamp: 4000}, + // {Value: 5, Timestamp: 5000}, + // {Value: 5.5, Timestamp: 5500}, + //} expectedMatrix := model.Matrix{ - // From chunks ingester only. - &model.SampleStream{ - Metric: model.Metric{labels.MetricName: "s", "l": "1"}, - Values: s1Values, - }, + //// From chunks ingester only. + //&model.SampleStream{ + // Metric: model.Metric{labels.MetricName: "s", "l": "1"}, + // Values: s1Values, + //}, // From blocks ingester only. &model.SampleStream{ @@ -142,10 +142,10 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { }, // Merged from both ingesters. - &model.SampleStream{ - Metric: model.Metric{labels.MetricName: "s", "l": "2"}, - Values: s1AndS2ValuesMerged, - }, + //&model.SampleStream{ + // Metric: model.Metric{labels.MetricName: "s", "l": "2"}, + // Values: s1AndS2ValuesMerged, + //}, } require.Equal(t, model.ValMatrix, result.Type()) diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index 5b06cd3e0e..ecd00c327d 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -29,7 +29,7 @@ import ( func TestLen(t *testing.T) { chunks := []Chunk{} - for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} { + for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk, PrometheusXorChunk} { c, err := NewForEncoding(encoding) if err != nil { t.Fatal(err) @@ -63,6 +63,7 @@ func TestChunk(t *testing.T) { {DoubleDelta, 989}, {Varbit, 2048}, {Bigchunk, 4096}, + {PrometheusXorChunk, 2048}, } { for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 { diff --git a/pkg/chunk/encoding/prometheus_chunk.go b/pkg/chunk/encoding/prometheus_chunk.go index 903ab680b0..dff0c044c0 100644 --- a/pkg/chunk/encoding/prometheus_chunk.go +++ b/pkg/chunk/encoding/prometheus_chunk.go @@ -8,7 +8,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -// Read-only wrapper around Prometheus chunk. +// Wrapper around Prometheus chunk. While it supports adding more samples, that is only implemented +// to make tests work, and should not be used in production. type prometheusXorChunk struct { chunk chunkenc.Chunk } @@ -17,22 +18,32 @@ func newPrometheusXorChunk() *prometheusXorChunk { return &prometheusXorChunk{} } -func (p *prometheusXorChunk) Add(_ model.SamplePair) (Chunk, error) { - return nil, errors.New("cannot add new samples to Prometheus chunk") +func (p *prometheusXorChunk) Add(m model.SamplePair) (Chunk, error) { + if p.chunk == nil { + p.chunk = chunkenc.NewXORChunk() + } + + app, err := p.chunk.Appender() + if err != nil { + return nil, err + } + + app.Append(int64(m.Timestamp), float64(m.Value)) + return nil, nil } func (p *prometheusXorChunk) NewIterator(iterator Iterator) Iterator { if p.chunk == nil { - // TODO: return error iterator - return nil + return errorIterator("Prometheus chunk is not set") } if pit, ok := iterator.(*prometheusChunkIterator); ok { + pit.c = p.chunk pit.it = p.chunk.Iterator(pit.it) return pit } - return &prometheusChunkIterator{p.chunk.Iterator(nil)} + return &prometheusChunkIterator{c: p.chunk, it: p.chunk.Iterator(nil)} } func (p *prometheusXorChunk) Marshal(i io.Writer) error { @@ -63,16 +74,43 @@ func (p *prometheusXorChunk) Utilization() float64 { } func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk { - // Not implemented. return p } -func (p *prometheusXorChunk) Rebound(_, _ model.Time) (Chunk, error) { - return nil, errors.New("rebound not supported by Prometheus chunk") +func (p *prometheusXorChunk) Rebound(from, to model.Time) (Chunk, error) { + if p.chunk == nil { + return p, nil + } + + nc := chunkenc.NewXORChunk() + app, err := nc.Appender() + if err != nil { + return nil, err + } + + it := p.chunk.Iterator(nil) + for ok := it.Seek(int64(from)); ok; ok = it.Next() { + t, v := it.At() + if t <= int64(to) { + app.Append(t, v) + } else { + break + } + } + + nc.Compact() + if nc.NumSamples() == 0 { + return nil, ErrSliceNoDataInRange + } + + return &prometheusXorChunk{chunk: nc}, nil } func (p *prometheusXorChunk) Len() int { - return p.Size() + if p.chunk == nil { + return 0 + } + return p.chunk.NumSamples() } func (p *prometheusXorChunk) Size() int { @@ -83,6 +121,7 @@ func (p *prometheusXorChunk) Size() int { } type prometheusChunkIterator struct { + c chunkenc.Chunk // we need chunk, because FindAtOrAfter needs to start with fresh iterator. it chunkenc.Iterator } @@ -91,6 +130,9 @@ func (p *prometheusChunkIterator) Scan() bool { } func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool { + // FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator, + // otherwise we cannot guarantee OLDEST. + p.it = p.c.Iterator(p.it) return p.it.Seek(int64(time)) } @@ -122,3 +164,11 @@ func (p *prometheusChunkIterator) Batch(size int) Batch { func (p *prometheusChunkIterator) Err() error { return p.it.Err() } + +type errorIterator string + +func (e errorIterator) Scan() bool { return false } +func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false } +func (e errorIterator) Value() model.SamplePair { panic("no values") } +func (e errorIterator) Batch(size int) Batch { panic("no values") } +func (e errorIterator) Err() error { return errors.New(string(e)) } diff --git a/pkg/querier/batch/batch_test.go b/pkg/querier/batch/batch_test.go index b66c4eac48..77649eaa8d 100644 --- a/pkg/querier/batch/batch_test.go +++ b/pkg/querier/batch/batch_test.go @@ -24,6 +24,8 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) { {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.Varbit}, {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.DoubleDelta}, {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.DoubleDelta}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 1, enc: promchunk.PrometheusXorChunk}, + {numChunks: 1000, numSamplesPerChunk: 100, duplicationFactor: 3, enc: promchunk.PrometheusXorChunk}, } for _, scenario := range scenarios { diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 9cbd2223d9..c498712e6a 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -32,7 +32,7 @@ func TestChunkIter(t *testing.T) { func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { for _, enc := range []promchunk.Encoding{ - promchunk.DoubleDelta, promchunk.Varbit, promchunk.Bigchunk, + promchunk.DoubleDelta, promchunk.Varbit, promchunk.Bigchunk, promchunk.PrometheusXorChunk, } { t.Run(enc.String(), func(t *testing.T) { f(t, enc) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c61301180a..f5c11338be 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -72,6 +72,7 @@ var ( {"DoubleDelta", promchunk.DoubleDelta}, {"Varbit", promchunk.Varbit}, {"Bigchunk", promchunk.Bigchunk}, + {"PrometheusXorChunk", promchunk.PrometheusXorChunk}, } queries = []query{ From 0aa1eac11300588690c45df78b8ae9510eef8de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 17:43:14 +0100 Subject: [PATCH 03/12] Support streaming with both timeseries and chunks for now. Added tests and benchmarks. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/ingester/ingester.go | 6 +- pkg/ingester/ingester_v2.go | 111 +++++++++++-- pkg/ingester/ingester_v2_test.go | 276 +++++++++++++++++++++++++++---- pkg/ingester/lifecycle_test.go | 4 +- 4 files changed, 345 insertions(+), 52 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3f0aabc558..9d3e5db173 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -79,8 +79,9 @@ type Config struct { ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"` // Use blocks storage. - BlocksStorageEnabled bool `yaml:"-"` - BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"` + BlocksStorageEnabled bool `yaml:"-"` + BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"` + StreamChunksWhenUsingBlocks bool `yaml:"-"` // Injected at runtime and read from the distributor config, required // to accurately apply global limits. @@ -114,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", false, "Enable tracking of active series and export them as metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") + f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default.") } // Ingester deals with "in flight" chunks. Based on Prometheus 1.x diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index e836f81527..2c17fd380c 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -1135,22 +1135,108 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste return nil } - q, err := db.ChunkQuerier(ctx, int64(from), int64(through)) + numSamples := 0 + numSeries := 0 + + if i.cfg.StreamChunksWhenUsingBlocks { + numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream) + } else { + numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream) + } if err != nil { return err } + + i.metrics.queriedSeries.Observe(float64(numSeries)) + i.metrics.queriedSamples.Observe(float64(numSamples)) + level.Debug(spanlog).Log("series", numSeries, "samples", numSamples) + return nil +} + +func (i *Ingester) v2QueryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) { + q, err := db.Querier(ctx, from, through) + if err != nil { + return 0, 0, err + } defer q.Close() // It's not required to return sorted series because series are sorted by the Cortex querier. ss := q.Select(false, nil, matchers...) if ss.Err() != nil { - return ss.Err() + return 0, 0, ss.Err() + } + + timeseries := make([]cortexpb.TimeSeries, 0, queryStreamBatchSize) + batchSizeBytes := 0 + for ss.Next() { + series := ss.At() + + // convert labels to LabelAdapter + ts := cortexpb.TimeSeries{ + Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()), + } + + it := series.Iterator() + for it.Next() { + t, v := it.At() + ts.Samples = append(ts.Samples, cortexpb.Sample{Value: v, TimestampMs: t}) + } + numSamples += len(ts.Samples) + numSeries++ + tsSize := ts.Size() + + if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize { + // Adding this series to the batch would make it too big, + // flush the data and add it to new batch instead. + err = client.SendQueryStream(stream, &client.QueryStreamResponse{ + Timeseries: timeseries, + }) + if err != nil { + return 0, 0, err + } + + batchSizeBytes = 0 + timeseries = timeseries[:0] + } + + timeseries = append(timeseries, ts) + batchSizeBytes += tsSize + } + + // Ensure no error occurred while iterating the series set. + if err := ss.Err(); err != nil { + return 0, 0, err + } + + // Final flush any existing metrics + if batchSizeBytes != 0 { + err = client.SendQueryStream(stream, &client.QueryStreamResponse{ + Timeseries: timeseries, + }) + if err != nil { + return 0, 0, err + } + } + + return numSeries, numSamples, nil +} + +// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface +func (i *Ingester) v2QueryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) { + q, err := db.ChunkQuerier(ctx, from, through) + if err != nil { + return 0, 0, err + } + defer q.Close() + + // It's not required to return sorted series because series are sorted by the Cortex querier. + ss := q.Select(false, nil, matchers...) + if ss.Err() != nil { + return 0, 0, ss.Err() } chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize) batchSizeBytes := 0 - numSamples := 0 - numSeries := 0 for ss.Next() { series := ss.At() @@ -1167,7 +1253,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste // It is not guaranteed that chunk returned by iterator is populated. // For now just return error. We could also try to figure out how to read the chunk. if meta.Chunk == nil { - return errors.Errorf("unfilled chunk returned from TSDB chunk querier") + return 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier") } ch := client.Chunk{ @@ -1175,12 +1261,14 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste EndTimestampMs: meta.MaxTime, Data: meta.Chunk.Bytes(), } + switch meta.Chunk.Encoding() { case chunkenc.EncXOR: ch.Encoding = int32(encoding.PrometheusXorChunk) default: - return errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding()) + return 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding()) } + ts.Chunks = append(ts.Chunks, ch) numSamples += meta.Chunk.NumSamples() } @@ -1194,7 +1282,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste Chunkseries: chunkSeries, }) if err != nil { - return err + return 0, 0, err } batchSizeBytes = 0 @@ -1207,7 +1295,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste // Ensure no error occurred while iterating the series set. if err := ss.Err(); err != nil { - return err + return 0, 0, err } // Final flush any existing metrics @@ -1216,14 +1304,11 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste Chunkseries: chunkSeries, }) if err != nil { - return err + return 0, 0, err } } - i.metrics.queriedSeries.Observe(float64(numSeries)) - i.metrics.queriedSamples.Observe(float64(numSamples)) - level.Debug(spanlog).Log("series", numSeries, "samples", numSamples) - return nil + return numSeries, numSamples, nil } func (i *Ingester) getTSDB(userID string) *userTSDB { diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1f7936f74e..66e6cd4917 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -37,6 +38,8 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -697,7 +700,7 @@ func Test_Ingester_v2LabelNames(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -741,7 +744,7 @@ func Test_Ingester_v2LabelValues(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -860,7 +863,7 @@ func Test_Ingester_v2Query(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -1154,7 +1157,7 @@ func Test_Ingester_v2MetricsForLabelMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range fixtures { - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -1281,7 +1284,7 @@ func createIngesterWithSeries(t testing.TB, userID string, numSeries int, timest return i } -func TestIngester_v2QueryStream(t *testing.T) { +func TestIngester_v2QueryStreamSamples(t *testing.T) { // Create ingester. i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(), nil) require.NoError(t, err) @@ -1296,7 +1299,7 @@ func TestIngester_v2QueryStream(t *testing.T) { // Push series. ctx := user.InjectOrgID(context.Background(), userID) lbls := labels.Labels{{Name: labels.MetricName, Value: "foo"}} - req, _, expectedResponse := mockWriteRequest(lbls, 123000, 456) + req, _, expectedResponse, _ := mockWriteRequest(t, lbls, 123000, 456) _, err = i.v2Push(ctx, req) require.NoError(t, err) @@ -1336,6 +1339,73 @@ func TestIngester_v2QueryStream(t *testing.T) { break } require.NoError(t, err) + require.Zero(t, len(resp.Chunkseries)) + count += len(resp.Timeseries) + lastResp = resp + } + require.Equal(t, 1, count) + require.Equal(t, expectedResponse, lastResp) +} + +func TestIngester_v2QueryStreamChunks(t *testing.T) { + // Create ingester. + cfg := defaultIngesterTestConfig() + cfg.StreamChunksWhenUsingBlocks = true + + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE. + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push series. + ctx := user.InjectOrgID(context.Background(), userID) + lbls := labels.Labels{{Name: labels.MetricName, Value: "foo"}} + req, _, _, expectedResponse := mockWriteRequest(t, lbls, 123000, 456) + _, err = i.v2Push(ctx, req) + require.NoError(t, err) + + // Create a GRPC server used to query back the data. + serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + defer serv.GracefulStop() + client.RegisterIngesterServer(serv, i) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + go func() { + require.NoError(t, serv.Serve(listener)) + }() + + // Query back the series using GRPC streaming. + c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) + require.NoError(t, err) + defer c.Close() + + s, err := c.QueryStream(ctx, &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: 200000, + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "foo", + }}, + }) + require.NoError(t, err) + + count := 0 + var lastResp *client.QueryStreamResponse + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + require.Zero(t, len(resp.Timeseries)) count += len(resp.Chunkseries) lastResp = resp } @@ -1359,7 +1429,7 @@ func TestIngester_v2QueryStreamManySamples(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) const samplesCount = 100000 - samples := make([]client.Sample, 0, samplesCount) + samples := make([]cortexpb.Sample, 0, samplesCount) for i := 0; i < samplesCount; i++ { samples = append(samples, client.Sample{ @@ -1419,7 +1489,7 @@ func TestIngester_v2QueryStreamManySamples(t *testing.T) { break } require.NoError(t, err) - require.True(t, len(resp.Chunkseries) > 0) // No empty messages. + require.True(t, len(resp.Timeseries) > 0) // No empty messages. recvMsgs++ series += len(resp.Timeseries) @@ -1437,15 +1507,118 @@ func TestIngester_v2QueryStreamManySamples(t *testing.T) { require.Equal(t, 10000+50000+samplesCount, totalSamples) } -func writeRequestSingleSeries(lbls labels.Labels, samples []client.Sample) *client.WriteRequest { - req := &client.WriteRequest{ - Source: client.API, +func TestIngester_v2QueryStreamManySamplesChunks(t *testing.T) { + // Create ingester. + cfg := defaultIngesterTestConfig() + cfg.StreamChunksWhenUsingBlocks = true + + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE. + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Push series. + ctx := user.InjectOrgID(context.Background(), userID) + + const samplesCount = 1000000 + samples := make([]cortexpb.Sample, 0, samplesCount) + + for i := 0; i < samplesCount; i++ { + samples = append(samples, cortexpb.Sample{ + Value: float64(i), + TimestampMs: int64(i), + }) + } + + // 100k samples in chunks use about 154 KiB, + _, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "1"}}, samples[0:100000])) + require.NoError(t, err) + + // 1M samples in chunks use about 1.51 MiB, + _, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "2"}}, samples)) + require.NoError(t, err) + + // 500k samples in chunks need 775 KiB, + _, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "3"}}, samples[0:500000])) + require.NoError(t, err) + + // Create a GRPC server used to query back the data. + serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) + defer serv.GracefulStop() + client.RegisterIngesterServer(serv, i) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + go func() { + require.NoError(t, serv.Serve(listener)) + }() + + // Query back the series using GRPC streaming. + c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) + require.NoError(t, err) + defer c.Close() + + s, err := c.QueryStream(ctx, &client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: samplesCount + 1, + + Matchers: []*client.LabelMatcher{{ + Type: client.EQUAL, + Name: model.MetricNameLabel, + Value: "foo", + }}, + }) + require.NoError(t, err) + + recvMsgs := 0 + series := 0 + totalSamples := 0 + + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + require.True(t, len(resp.Chunkseries) > 0) // No empty messages. + + recvMsgs++ + series += len(resp.Chunkseries) + + for _, ts := range resp.Chunkseries { + for _, c := range ts.Chunks { + ch, err := encoding.NewForEncoding(encoding.Encoding(c.Encoding)) + require.NoError(t, err) + require.NoError(t, ch.UnmarshalFromBuf(c.Data)) + + totalSamples += ch.Len() + } + } + } + + // As ingester doesn't guarantee sorting of series, we can get 2 (100k + 500k in first, 1M in second) + // or 3 messages (100k or 500k first, 1M second, and 500k or 100k last). + + require.True(t, 2 <= recvMsgs && recvMsgs <= 3) + require.Equal(t, 3, series) + require.Equal(t, 100000+500000+samplesCount, totalSamples) +} + +func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *cortexpb.WriteRequest { + req := &cortexpb.WriteRequest{ + Source: cortexpb.API, } - ts := client.TimeSeries{} - ts.Labels = client.FromLabelsToLabelAdapters(lbls) + ts := cortexpb.TimeSeries{} + ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbls) ts.Samples = samples - req.Timeseries = append(req.Timeseries, client.PreallocTimeseries{TimeSeries: &ts}) + req.Timeseries = append(req.Timeseries, cortexpb.PreallocTimeseries{TimeSeries: &ts}) return req } @@ -1463,9 +1636,20 @@ func (m *mockQueryStreamServer) Context() context.Context { return m.ctx } -func BenchmarkIngester_v2QueryStream(b *testing.B) { +func BenchmarkIngester_v2QueryStream_Samples(b *testing.B) { + benchmarkV2QueryStream(b, false) +} + +func BenchmarkIngester_v2QueryStream_Chunks(b *testing.B) { + benchmarkV2QueryStream(b, true) +} + +func benchmarkV2QueryStream(b *testing.B, streamChunks bool) { + cfg := defaultIngesterTestConfig() + cfg.StreamChunksWhenUsingBlocks = streamChunks + // Create ingester. - i, err := prepareIngesterWithBlocksStorage(b, defaultIngesterTestConfig(), nil) + i, err := prepareIngesterWithBlocksStorage(b, cfg, nil) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -1515,36 +1699,58 @@ func BenchmarkIngester_v2QueryStream(b *testing.B) { } } -func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse) { - samples := []client.Sample{ +func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse, *client.QueryStreamResponse) { + samples := []cortexpb.Sample{ { TimestampMs: timestampMs, Value: value, }, } - req := client.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API) + req := cortexpb.ToWriteRequest([]labels.Labels{lbls}, samples, nil, client.API) // Generate the expected response expectedQueryRes := &client.QueryResponse{ - Timeseries: []client.TimeSeries{ + Timeseries: []cortexpb.TimeSeries{ { - Labels: client.FromLabelsToLabelAdapters(lbls), + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), Samples: samples, }, }, } - expectedQueryStreamRes := &client.QueryStreamResponse{ - Timeseries: []client.TimeSeries{ + expectedQueryStreamResSamples := &client.QueryStreamResponse{ + Timeseries: []cortexpb.TimeSeries{ { - Labels: client.FromLabelsToLabelAdapters(lbls), + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), Samples: samples, }, }, } - return req, expectedQueryRes, expectedQueryStreamRes + chunk := chunkenc.NewXORChunk() + app, err := chunk.Appender() + require.NoError(t, err) + app.Append(timestampMs, value) + chunk.Compact() + + expectedQueryStreamResChunks := &client.QueryStreamResponse{ + Chunkseries: []client.TimeSeriesChunk{ + { + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), + Chunks: []client.Chunk{ + { + StartTimestampMs: timestampMs, + EndTimestampMs: timestampMs, + Encoding: int32(encoding.PrometheusXorChunk), + Data: chunk.Bytes(), + }, + }, + }, + }, + } + + return req, expectedQueryRes, expectedQueryStreamResSamples, expectedQueryStreamResChunks } func prepareIngesterWithBlocksStorage(t testing.TB, ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, error) { @@ -1940,7 +2146,7 @@ func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) { sampleTimestamp := int64(model.Now()) { - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, sampleTimestamp) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, sampleTimestamp) _, err = i.v2Push(ctx, req) require.NoError(t, err) } @@ -1955,7 +2161,7 @@ func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) { // Push another sample to the same metric and timestamp, with different value. We expect to get error. { - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 1, sampleTimestamp) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 1, sampleTimestamp) _, err = i.v2Push(ctx, req) require.Error(t, err) } @@ -2249,7 +2455,7 @@ func Test_Ingester_v2UserStats(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2294,7 +2500,7 @@ func Test_Ingester_v2AllUserStats(t *testing.T) { }) for _, series := range series { ctx := user.InjectOrgID(context.Background(), series.user) - req, _, _ := mockWriteRequest(series.lbls, series.value, series.timestamp) + req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2541,7 +2747,7 @@ func verifyCompactedHead(t *testing.T, i *Ingester, expected bool) { func pushSingleSampleWithMetadata(t *testing.T, i *Ingester) { ctx := user.InjectOrgID(context.Background(), userID) - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) req.Metadata = append(req.Metadata, &client.MetricMetadata{MetricFamilyName: "test", Help: "a help for metric", Unit: "", Type: client.COUNTER}) _, err := i.v2Push(ctx, req) require.NoError(t, err) @@ -2549,7 +2755,7 @@ func pushSingleSampleWithMetadata(t *testing.T, i *Ingester) { func pushSingleSampleAtTime(t *testing.T, i *Ingester, ts int64) { ctx := user.InjectOrgID(context.Background(), userID) - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, ts) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, ts) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2692,7 +2898,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Push some data to create 3 blocks. ctx := user.InjectOrgID(context.Background(), userID) for j := int64(0); j < 5; j++ { - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2719,7 +2925,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Add more samples that could trigger another compaction and hence reload of blocks. for j := int64(5); j < 6; j++ { - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2747,7 +2953,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Add more samples that could trigger another compaction and hence reload of blocks. for j := int64(6); j < 7; j++ { - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.v2Push(ctx, req) require.NoError(t, err) } @@ -2793,7 +2999,7 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { require.True(t, db.casState(active, forceCompacting)) // Ingestion should fail with a 503. - req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) ctx := user.InjectOrgID(context.Background(), userID) _, err = i.v2Push(ctx, req) require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("forced compaction in progress"), userID).Error()), err) diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index e0425e6385..fa56e31aeb 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -140,7 +140,7 @@ func TestIngesterChunksTransfer(t *testing.T) { }) // Now write a sample to this ingester - req, expectedResponse, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) + req, expectedResponse, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) ctx := user.InjectOrgID(context.Background(), userID) _, err = ing1.Push(ctx, req) require.NoError(t, err) @@ -181,7 +181,7 @@ func TestIngesterChunksTransfer(t *testing.T) { assert.Equal(t, expectedResponse, response) // Check we can send the same sample again to the new ingester and get the same result - req, _, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) + req, _, _, _ = mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) _, err = ing2.Push(ctx, req) require.NoError(t, err) response, err = ing2.Query(ctx, request) From b6b56827c319803372240c7b4225cd2033299078 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 17:45:55 +0100 Subject: [PATCH 04/12] Revert changes to test. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- .../querier_streaming_mixed_ingester_test.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/integration/querier_streaming_mixed_ingester_test.go b/integration/querier_streaming_mixed_ingester_test.go index 820f777775..ad82bf6a31 100644 --- a/integration/querier_streaming_mixed_ingester_test.go +++ b/integration/querier_streaming_mixed_ingester_test.go @@ -118,22 +118,22 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { {Value: 5, Timestamp: 5000}, } - //s1AndS2ValuesMerged := []model.SamplePair{ - // {Value: 1, Timestamp: 1000}, - // {Value: 2, Timestamp: 2000}, - // {Value: 2.5, Timestamp: 2500}, - // {Value: 3, Timestamp: 3000}, - // {Value: 4, Timestamp: 4000}, - // {Value: 5, Timestamp: 5000}, - // {Value: 5.5, Timestamp: 5500}, - //} + s1AndS2ValuesMerged := []model.SamplePair{ + {Value: 1, Timestamp: 1000}, + {Value: 2, Timestamp: 2000}, + {Value: 2.5, Timestamp: 2500}, + {Value: 3, Timestamp: 3000}, + {Value: 4, Timestamp: 4000}, + {Value: 5, Timestamp: 5000}, + {Value: 5.5, Timestamp: 5500}, + } expectedMatrix := model.Matrix{ - //// From chunks ingester only. - //&model.SampleStream{ - // Metric: model.Metric{labels.MetricName: "s", "l": "1"}, - // Values: s1Values, - //}, + // From chunks ingester only. + &model.SampleStream{ + Metric: model.Metric{labels.MetricName: "s", "l": "1"}, + Values: s1Values, + }, // From blocks ingester only. &model.SampleStream{ @@ -142,10 +142,10 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { }, // Merged from both ingesters. - //&model.SampleStream{ - // Metric: model.Metric{labels.MetricName: "s", "l": "2"}, - // Values: s1AndS2ValuesMerged, - //}, + &model.SampleStream{ + Metric: model.Metric{labels.MetricName: "s", "l": "2"}, + Values: s1AndS2ValuesMerged, + }, } require.Equal(t, model.ValMatrix, result.Type()) From b77bbb97192f8b8daab0111cd2af0213b3c6dc82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 17:46:16 +0100 Subject: [PATCH 05/12] Revert changes to test. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- integration/querier_streaming_mixed_ingester_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/querier_streaming_mixed_ingester_test.go b/integration/querier_streaming_mixed_ingester_test.go index ad82bf6a31..ee347171c5 100644 --- a/integration/querier_streaming_mixed_ingester_test.go +++ b/integration/querier_streaming_mixed_ingester_test.go @@ -107,7 +107,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { require.NoError(t, err) // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). - result, err := c.Query("s{l=\"3\"}[1m]", time.Unix(10, 0)) + result, err := c.Query("s[1m]", time.Unix(10, 0)) require.NoError(t, err) s1Values := []model.SamplePair{ From 1b8e3670f1503173ba8bf1f705b77830e2588390 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 17:46:54 +0100 Subject: [PATCH 06/12] Fix comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/chunk/encoding/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/encoding/factory.go b/pkg/chunk/encoding/factory.go index 90a6a53302..efd88b9c2b 100644 --- a/pkg/chunk/encoding/factory.go +++ b/pkg/chunk/encoding/factory.go @@ -52,7 +52,7 @@ const ( Varbit // Bigchunk encoding Bigchunk - // Read-only wrapper around Prometheus XOR-encoded chunk. + // PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk. PrometheusXorChunk ) From 53024703a6c1425786cbb453c6707e45f83be6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 20:23:31 +0100 Subject: [PATCH 07/12] Extend integration test with chunk streaming. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- integration/querier_streaming_mixed_ingester_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/integration/querier_streaming_mixed_ingester_test.go b/integration/querier_streaming_mixed_ingester_test.go index ee347171c5..ccf1145139 100644 --- a/integration/querier_streaming_mixed_ingester_test.go +++ b/integration/querier_streaming_mixed_ingester_test.go @@ -5,6 +5,7 @@ package integration import ( "context" "flag" + "fmt" "strings" "testing" "time" @@ -21,6 +22,14 @@ import ( ) func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { + for _, streamChunks := range []bool{false, true} { + t.Run(fmt.Sprintf("%v", streamChunks), func(t *testing.T) { + testQuerierWithStreamingBlocksAndChunksIngesters(t, streamChunks) + }) + } +} + +func testQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T, streamChunks bool) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -33,6 +42,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) { "-store-gateway.sharding-enabled": "false", "-querier.ingester-streaming": "true", }) + blockFlags["-ingester.stream-chunks-when-using-blocks"] = fmt.Sprintf("%v", streamChunks) // Start dependencies. consul := e2edb.NewConsul() From 2a72c96355181e537d2d0bcd5f64e09f9d9d4a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 21:04:31 +0100 Subject: [PATCH 08/12] Add runtime-config for switching between types of query-stream. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/cortex/modules.go | 1 + pkg/cortex/runtime_config.go | 26 ++++++ pkg/ingester/ingester.go | 2 + pkg/ingester/ingester_v2.go | 28 +++++++ pkg/ingester/ingester_v2_test.go | 133 +++++++++++++------------------ 5 files changed, 111 insertions(+), 79 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3fca86ef1c..f9bcddddaa 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -415,6 +415,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels + t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig) t.tsdbIngesterConfig() t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 8a01034c58..e115198ba0 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -7,6 +7,7 @@ import ( "gopkg.in/yaml.v2" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" @@ -24,6 +25,8 @@ type runtimeConfigValues struct { TenantLimits map[string]*validation.Limits `yaml:"overrides"` Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` + + IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` } // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager @@ -98,6 +101,29 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch return outCh } } + +func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType { + if manager == nil { + return nil + } + + return func() ingester.QueryStreamType { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + if cfg.IngesterChunkStreaming == nil { + return ingester.QueryStreamDefault + } + + if *cfg.IngesterChunkStreaming { + return ingester.QueryStreamChunks + } + return ingester.QueryStreamSamples + } + + return ingester.QueryStreamDefault + } +} + func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9d3e5db173..d2ddf0adc7 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -82,6 +82,8 @@ type Config struct { BlocksStorageEnabled bool `yaml:"-"` BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"` StreamChunksWhenUsingBlocks bool `yaml:"-"` + // Runtime-override for type of streaming query to use (chunks or samples). + StreamTypeFn func() QueryStreamType `yaml:"-"` // Injected at runtime and read from the distributor config, required // to accurately apply global limits. diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 2c17fd380c..8512dcefe1 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -89,6 +89,15 @@ func (r tsdbCloseCheckResult) shouldClose() bool { return r == tsdbIdle || r == tsdbTenantMarkedForDeletion } +// QueryStreamType defines type of function to use when doing query-stream operation. +type QueryStreamType int + +const ( + QueryStreamDefault QueryStreamType = iota // Use default configured value. + QueryStreamSamples // Stream individual samples. + QueryStreamChunks // Stream entire chunks. +) + type userTSDB struct { db *tsdb.DB userID string @@ -1138,9 +1147,28 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste numSamples := 0 numSeries := 0 + streamType := QueryStreamSamples if i.cfg.StreamChunksWhenUsingBlocks { + streamType = QueryStreamChunks + } + + if i.cfg.StreamTypeFn != nil { + runtimeType := i.cfg.StreamTypeFn() + switch runtimeType { + case QueryStreamChunks: + streamType = QueryStreamChunks + case QueryStreamSamples: + streamType = QueryStreamSamples + default: + // no change from config value. + } + } + + if streamType == QueryStreamChunks { + level.Debug(spanlog).Log("msg", "using v2QueryStreamChunks") numSeries, numSamples, err = i.v2QueryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream) } else { + level.Debug(spanlog).Log("msg", "using v2QueryStreamSamples") numSeries, numSamples, err = i.v2QueryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream) } if err != nil { diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 66e6cd4917..32c30e86af 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1284,73 +1284,15 @@ func createIngesterWithSeries(t testing.TB, userID string, numSeries int, timest return i } -func TestIngester_v2QueryStreamSamples(t *testing.T) { +func TestIngester_v2QueryStream(t *testing.T) { // Create ingester. - i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(), nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - - // Wait until it's ACTIVE. - test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { - return i.lifecycler.GetState() - }) - - // Push series. - ctx := user.InjectOrgID(context.Background(), userID) - lbls := labels.Labels{{Name: labels.MetricName, Value: "foo"}} - req, _, expectedResponse, _ := mockWriteRequest(t, lbls, 123000, 456) - _, err = i.v2Push(ctx, req) - require.NoError(t, err) - - // Create a GRPC server used to query back the data. - serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) - defer serv.GracefulStop() - client.RegisterIngesterServer(serv, i) - - listener, err := net.Listen("tcp", "localhost:0") - require.NoError(t, err) - - go func() { - require.NoError(t, serv.Serve(listener)) - }() - - // Query back the series using GRPC streaming. - c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) - require.NoError(t, err) - defer c.Close() - - s, err := c.QueryStream(ctx, &client.QueryRequest{ - StartTimestampMs: 0, - EndTimestampMs: 200000, - Matchers: []*client.LabelMatcher{{ - Type: client.EQUAL, - Name: model.MetricNameLabel, - Value: "foo", - }}, - }) - require.NoError(t, err) + cfg := defaultIngesterTestConfig() - count := 0 - var lastResp *client.QueryStreamResponse - for { - resp, err := s.Recv() - if err == io.EOF { - break - } - require.NoError(t, err) - require.Zero(t, len(resp.Chunkseries)) - count += len(resp.Timeseries) - lastResp = resp + // change stream type in runtime. + var streamType QueryStreamType + cfg.StreamTypeFn = func() QueryStreamType { + return streamType } - require.Equal(t, 1, count) - require.Equal(t, expectedResponse, lastResp) -} - -func TestIngester_v2QueryStreamChunks(t *testing.T) { - // Create ingester. - cfg := defaultIngesterTestConfig() - cfg.StreamChunksWhenUsingBlocks = true i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) require.NoError(t, err) @@ -1365,7 +1307,7 @@ func TestIngester_v2QueryStreamChunks(t *testing.T) { // Push series. ctx := user.InjectOrgID(context.Background(), userID) lbls := labels.Labels{{Name: labels.MetricName, Value: "foo"}} - req, _, _, expectedResponse := mockWriteRequest(t, lbls, 123000, 456) + req, _, expectedResponseSamples, expectedResponseChunks := mockWriteRequest(t, lbls, 123000, 456) _, err = i.v2Push(ctx, req) require.NoError(t, err) @@ -1386,7 +1328,7 @@ func TestIngester_v2QueryStreamChunks(t *testing.T) { require.NoError(t, err) defer c.Close() - s, err := c.QueryStream(ctx, &client.QueryRequest{ + queryRequest := &client.QueryRequest{ StartTimestampMs: 0, EndTimestampMs: 200000, Matchers: []*client.LabelMatcher{{ @@ -1394,23 +1336,56 @@ func TestIngester_v2QueryStreamChunks(t *testing.T) { Name: model.MetricNameLabel, Value: "foo", }}, - }) - require.NoError(t, err) + } - count := 0 - var lastResp *client.QueryStreamResponse - for { - resp, err := s.Recv() - if err == io.EOF { - break + samplesTest := func(t *testing.T) { + s, err := c.QueryStream(ctx, queryRequest) + require.NoError(t, err) + + count := 0 + var lastResp *client.QueryStreamResponse + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + require.Zero(t, len(resp.Chunkseries)) // No chunks expected + count += len(resp.Timeseries) + lastResp = resp } + require.Equal(t, 1, count) + require.Equal(t, expectedResponseSamples, lastResp) + } + + chunksTest := func(t *testing.T) { + s, err := c.QueryStream(ctx, queryRequest) require.NoError(t, err) - require.Zero(t, len(resp.Timeseries)) - count += len(resp.Chunkseries) - lastResp = resp + + count := 0 + var lastResp *client.QueryStreamResponse + for { + resp, err := s.Recv() + if err == io.EOF { + break + } + require.NoError(t, err) + require.Zero(t, len(resp.Timeseries)) // No samples expected + count += len(resp.Chunkseries) + lastResp = resp + } + require.Equal(t, 1, count) + require.Equal(t, expectedResponseChunks, lastResp) } - require.Equal(t, 1, count) - require.Equal(t, expectedResponse, lastResp) + + streamType = QueryStreamDefault + t.Run("default", samplesTest) + + streamType = QueryStreamSamples + t.Run("samples", samplesTest) + + streamType = QueryStreamChunks + t.Run("chunks", chunksTest) } func TestIngester_v2QueryStreamManySamples(t *testing.T) { From 4ca0d0e0af280f3e94fc65da84a5c98e8f698f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 21:20:04 +0100 Subject: [PATCH 09/12] CHANGELOG.md, v1-guarantees. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- CHANGELOG.md | 1 + docs/configuration/v1-guarantees.md | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29a2b46167..cedb4c9191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ * [ENHANCEMENT] Store-gateway: added metrics to chunk buffer pool behaviour. #3880 * `cortex_bucket_store_chunk_pool_requested_bytes_total` * `cortex_bucket_store_chunk_pool_returned_bytes_total` +* [ENHANCEMENT] Blocks storage: Ingester can now stream entire chunks instead of individual samples to the querier. At the moment this feature must be explicitly enabled either by using `-ingester.stream-chunks-when-using-blocks` flag or `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file, but these configuration options are temporary and will be removed when feature is stable. #3889 * [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778 * [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745 * [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761 diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 93bd0f0358..992c67c98a 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -72,3 +72,7 @@ Currently experimental features are: - HA Tracker: cleanup of old replicas from KV Store. - Ruler storage: backend client configuration options using a config fields similar to the blocks storage backend clients. - Alertmanager storage: backend client configuration options using a config fields similar to the blocks storage backend clients. +- Ruler storage: backend client configuration options using a config fields similar to the TSDB object storage clients. +- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested: + - `-ingester.stream-chunks-when-using-blocks` CLI flag + - `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file From abf32bda16c44cecee2cc42aefc6de266518bfd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Mon, 1 Mar 2021 21:30:50 +0100 Subject: [PATCH 10/12] Docs. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/ingester/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d2ddf0adc7..30f51d1401 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -117,7 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", false, "Enable tracking of active series and export them as metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") - f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default.") + f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.") } // Ingester deals with "in flight" chunks. Based on Prometheus 1.x From 4e3e8c66bad952d01e4ca73e4b7bdda98bea778c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Tue, 2 Mar 2021 07:59:10 +0100 Subject: [PATCH 11/12] Clean white space. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- docs/configuration/v1-guarantees.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 992c67c98a..ca73229d10 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -74,5 +74,5 @@ Currently experimental features are: - Alertmanager storage: backend client configuration options using a config fields similar to the blocks storage backend clients. - Ruler storage: backend client configuration options using a config fields similar to the TSDB object storage clients. - Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested: - - `-ingester.stream-chunks-when-using-blocks` CLI flag + - `-ingester.stream-chunks-when-using-blocks` CLI flag - `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file From 1243d2f2148e99e2103f6dca2d9d283e9df03bed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= <peter.stibrany@grafana.com> Date: Thu, 4 Mar 2021 08:23:41 +0100 Subject: [PATCH 12/12] Review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> --- pkg/chunk/encoding/chunk_test.go | 8 ++++--- pkg/chunk/encoding/prometheus_chunk.go | 33 ++++---------------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index ecd00c327d..d2d00f5f54 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -88,9 +88,11 @@ func TestChunk(t *testing.T) { testChunkBatch(t, tc.encoding, samples) }) - t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { - testChunkRebound(t, tc.encoding, samples) - }) + if tc.encoding != PrometheusXorChunk { + t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkRebound(t, tc.encoding, samples) + }) + } } } } diff --git a/pkg/chunk/encoding/prometheus_chunk.go b/pkg/chunk/encoding/prometheus_chunk.go index dff0c044c0..00ccb44dfb 100644 --- a/pkg/chunk/encoding/prometheus_chunk.go +++ b/pkg/chunk/encoding/prometheus_chunk.go @@ -8,8 +8,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) -// Wrapper around Prometheus chunk. While it supports adding more samples, that is only implemented -// to make tests work, and should not be used in production. +// Wrapper around Prometheus chunk. type prometheusXorChunk struct { chunk chunkenc.Chunk } @@ -18,6 +17,9 @@ func newPrometheusXorChunk() *prometheusXorChunk { return &prometheusXorChunk{} } +// Add adds another sample to the chunk. While Add works, it is only implemented +// to make tests work, and should not be used in production. In particular, it appends +// all samples to single chunk, and uses new Appender for each Add. func (p *prometheusXorChunk) Add(m model.SamplePair) (Chunk, error) { if p.chunk == nil { p.chunk = chunkenc.NewXORChunk() @@ -78,32 +80,7 @@ func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk { } func (p *prometheusXorChunk) Rebound(from, to model.Time) (Chunk, error) { - if p.chunk == nil { - return p, nil - } - - nc := chunkenc.NewXORChunk() - app, err := nc.Appender() - if err != nil { - return nil, err - } - - it := p.chunk.Iterator(nil) - for ok := it.Seek(int64(from)); ok; ok = it.Next() { - t, v := it.At() - if t <= int64(to) { - app.Append(t, v) - } else { - break - } - } - - nc.Compact() - if nc.NumSamples() == 0 { - return nil, ErrSliceNoDataInRange - } - - return &prometheusXorChunk{chunk: nc}, nil + return nil, errors.New("Rebound not supported by PrometheusXorChunk") } func (p *prometheusXorChunk) Len() int {