Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream chunks from blocks-ingester to querier #3889

Merged
merged 13 commits into from
Mar 4, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* `cortex_bucket_store_chunk_pool_requested_bytes_total`
* `cortex_bucket_store_chunk_pool_returned_bytes_total`
* [ENHANCEMENT] Alertmanager: load alertmanager configurations from object storage concurrently, and only load necessary configurations, speeding configuration synchronization process and executing fewer "GET object" operations to the storage when sharding is enabled. #3898
* [ENHANCEMENT] Blocks storage: Ingester can now stream entire chunks instead of individual samples to the querier. At the moment this feature must be explicitly enabled either by using `-ingester.stream-chunks-when-using-blocks` flag or `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file, but these configuration options are temporary and will be removed when feature is stable. #3889
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ Currently experimental features are:
- HA Tracker: cleanup of old replicas from KV Store.
- Ruler storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
- Alertmanager storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
- Ruler storage: backend client configuration options using a config fields similar to the TSDB object storage clients.
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested:
- `-ingester.stream-chunks-when-using-blocks` CLI flag
- `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
10 changes: 10 additions & 0 deletions integration/querier_streaming_mixed_ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integration
import (
"context"
"flag"
"fmt"
"strings"
"testing"
"time"
Expand All @@ -21,6 +22,14 @@ import (
)

func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
for _, streamChunks := range []bool{false, true} {
t.Run(fmt.Sprintf("%v", streamChunks), func(t *testing.T) {
testQuerierWithStreamingBlocksAndChunksIngesters(t, streamChunks)
})
}
}

func testQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T, streamChunks bool) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -33,6 +42,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
"-store-gateway.sharding-enabled": "false",
"-querier.ingester-streaming": "true",
})
blockFlags["-ingester.stream-chunks-when-using-blocks"] = fmt.Sprintf("%v", streamChunks)

// Start dependencies.
consul := e2edb.NewConsul()
Expand Down
11 changes: 7 additions & 4 deletions pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestLen(t *testing.T) {
chunks := []Chunk{}
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} {
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk, PrometheusXorChunk} {
c, err := NewForEncoding(encoding)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -63,6 +63,7 @@ func TestChunk(t *testing.T) {
{DoubleDelta, 989},
{Varbit, 2048},
{Bigchunk, 4096},
{PrometheusXorChunk, 2048},
} {
for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 {

Expand All @@ -87,9 +88,11 @@ func TestChunk(t *testing.T) {
testChunkBatch(t, tc.encoding, samples)
})

t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkRebound(t, tc.encoding, samples)
})
if tc.encoding != PrometheusXorChunk {
t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkRebound(t, tc.encoding, samples)
})
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/chunk/encoding/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
Varbit
// Bigchunk encoding
Bigchunk
// PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk.
PrometheusXorChunk
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
)

type encoding struct {
Expand All @@ -78,6 +80,12 @@ var encodings = map[Encoding]encoding{
return newBigchunk()
},
},
PrometheusXorChunk: {
Name: "PrometheusXorChunk",
New: func() Chunk {
return newPrometheusXorChunk()
},
},
}

// Set implements flag.Value.
Expand Down
151 changes: 151 additions & 0 deletions pkg/chunk/encoding/prometheus_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package encoding

import (
"io"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Wrapper around Prometheus chunk.
type prometheusXorChunk struct {
chunk chunkenc.Chunk
}

func newPrometheusXorChunk() *prometheusXorChunk {
return &prometheusXorChunk{}
}

// Add adds another sample to the chunk. While Add works, it is only implemented
// to make tests work, and should not be used in production. In particular, it appends
// all samples to single chunk, and uses new Appender for each Add.
func (p *prometheusXorChunk) Add(m model.SamplePair) (Chunk, error) {
if p.chunk == nil {
p.chunk = chunkenc.NewXORChunk()
}

app, err := p.chunk.Appender()
if err != nil {
return nil, err
}

app.Append(int64(m.Timestamp), float64(m.Value))
return nil, nil
}

func (p *prometheusXorChunk) NewIterator(iterator Iterator) Iterator {
if p.chunk == nil {
return errorIterator("Prometheus chunk is not set")
}

if pit, ok := iterator.(*prometheusChunkIterator); ok {
pit.c = p.chunk
pit.it = p.chunk.Iterator(pit.it)
return pit
}

return &prometheusChunkIterator{c: p.chunk, it: p.chunk.Iterator(nil)}
}

func (p *prometheusXorChunk) Marshal(i io.Writer) error {
if p.chunk == nil {
return errors.New("chunk data not set")
}
_, err := i.Write(p.chunk.Bytes())
return err
}

func (p *prometheusXorChunk) UnmarshalFromBuf(bytes []byte) error {
c, err := chunkenc.FromData(chunkenc.EncXOR, bytes)
if err != nil {
return errors.Wrap(err, "failed to create Prometheus chunk from bytes")
}

p.chunk = c
return nil
}

func (p *prometheusXorChunk) Encoding() Encoding {
return PrometheusXorChunk
}

func (p *prometheusXorChunk) Utilization() float64 {
// Used for reporting when chunk is used to store new data.
return 0
}

func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk {
return p
}

func (p *prometheusXorChunk) Rebound(from, to model.Time) (Chunk, error) {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("Rebound not supported by PrometheusXorChunk")
}

func (p *prometheusXorChunk) Len() int {
if p.chunk == nil {
return 0
}
return p.chunk.NumSamples()
}

func (p *prometheusXorChunk) Size() int {
if p.chunk == nil {
return 0
}
return len(p.chunk.Bytes())
}

type prometheusChunkIterator struct {
c chunkenc.Chunk // we need chunk, because FindAtOrAfter needs to start with fresh iterator.
it chunkenc.Iterator
}

func (p *prometheusChunkIterator) Scan() bool {
return p.it.Next()
}

func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
// otherwise we cannot guarantee OLDEST.
p.it = p.c.Iterator(p.it)
return p.it.Seek(int64(time))
}

func (p *prometheusChunkIterator) Value() model.SamplePair {
ts, val := p.it.At()
return model.SamplePair{
Timestamp: model.Time(ts),
Value: model.SampleValue(val),
}
}

func (p *prometheusChunkIterator) Batch(size int) Batch {
var batch Batch
j := 0
for j < size {
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
j++
if j < size && !p.it.Next() {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
batch.Index = 0
batch.Length = j
return batch
}

func (p *prometheusChunkIterator) Err() error {
return p.it.Err()
}

type errorIterator string

func (e errorIterator) Scan() bool { return false }
func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false }
func (e errorIterator) Value() model.SamplePair { panic("no values") }
func (e errorIterator) Batch(size int) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
26 changes: 26 additions & 0 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
Expand All @@ -24,6 +25,8 @@ type runtimeConfigValues struct {
TenantLimits map[string]*validation.Limits `yaml:"overrides"`

Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`

IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`
}

// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
Expand Down Expand Up @@ -98,6 +101,29 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch
return outCh
}
}

func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType {
if manager == nil {
return nil
}

return func() ingester.QueryStreamType {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
if cfg.IngesterChunkStreaming == nil {
return ingester.QueryStreamDefault
}

if *cfg.IngesterChunkStreaming {
return ingester.QueryStreamChunks
}
return ingester.QueryStreamSamples
}

return ingester.QueryStreamDefault
}
}

func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ type Config struct {
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`

// Use blocks storage.
BlocksStorageEnabled bool `yaml:"-"`
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
BlocksStorageEnabled bool `yaml:"-"`
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
StreamChunksWhenUsingBlocks bool `yaml:"-"`
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// Runtime-override for type of streaming query to use (chunks or samples).
StreamTypeFn func() QueryStreamType `yaml:"-"`

// Injected at runtime and read from the distributor config, required
// to accurately apply global limits.
Expand Down Expand Up @@ -114,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", false, "Enable tracking of active series and export them as metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")
}

// Ingester deals with "in flight" chunks. Based on Prometheus 1.x
Expand Down
Loading