Skip to content

Commit

Permalink
Stream chunks from blocks-ingester to querier (cortexproject#3889)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored and harry671003 committed Mar 11, 2021
1 parent de6b5f7 commit c333577
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* `cortex_bucket_store_chunk_pool_requested_bytes_total`
* `cortex_bucket_store_chunk_pool_returned_bytes_total`
* [ENHANCEMENT] Alertmanager: load alertmanager configurations from object storage concurrently, and only load necessary configurations, speeding configuration synchronization process and executing fewer "GET object" operations to the storage when sharding is enabled. #3898
* [ENHANCEMENT] Blocks storage: Ingester can now stream entire chunks instead of individual samples to the querier. At the moment this feature must be explicitly enabled either by using `-ingester.stream-chunks-when-using-blocks` flag or `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file, but these configuration options are temporary and will be removed when feature is stable. #3889
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ Currently experimental features are:
- HA Tracker: cleanup of old replicas from KV Store.
- Ruler storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
- Alertmanager storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
- Ruler storage: backend client configuration options using a config fields similar to the TSDB object storage clients.
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested:
- `-ingester.stream-chunks-when-using-blocks` CLI flag
- `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
10 changes: 10 additions & 0 deletions integration/querier_streaming_mixed_ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integration
import (
"context"
"flag"
"fmt"
"strings"
"testing"
"time"
Expand All @@ -21,6 +22,14 @@ import (
)

func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
for _, streamChunks := range []bool{false, true} {
t.Run(fmt.Sprintf("%v", streamChunks), func(t *testing.T) {
testQuerierWithStreamingBlocksAndChunksIngesters(t, streamChunks)
})
}
}

func testQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T, streamChunks bool) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -33,6 +42,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
"-store-gateway.sharding-enabled": "false",
"-querier.ingester-streaming": "true",
})
blockFlags["-ingester.stream-chunks-when-using-blocks"] = fmt.Sprintf("%v", streamChunks)

// Start dependencies.
consul := e2edb.NewConsul()
Expand Down
11 changes: 7 additions & 4 deletions pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestLen(t *testing.T) {
chunks := []Chunk{}
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} {
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk, PrometheusXorChunk} {
c, err := NewForEncoding(encoding)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -63,6 +63,7 @@ func TestChunk(t *testing.T) {
{DoubleDelta, 989},
{Varbit, 2048},
{Bigchunk, 4096},
{PrometheusXorChunk, 2048},
} {
for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 {

Expand All @@ -87,9 +88,11 @@ func TestChunk(t *testing.T) {
testChunkBatch(t, tc.encoding, samples)
})

t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkRebound(t, tc.encoding, samples)
})
if tc.encoding != PrometheusXorChunk {
t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkRebound(t, tc.encoding, samples)
})
}
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/chunk/encoding/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
Varbit
// Bigchunk encoding
Bigchunk
// PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk.
PrometheusXorChunk
)

type encoding struct {
Expand All @@ -78,6 +80,12 @@ var encodings = map[Encoding]encoding{
return newBigchunk()
},
},
PrometheusXorChunk: {
Name: "PrometheusXorChunk",
New: func() Chunk {
return newPrometheusXorChunk()
},
},
}

// Set implements flag.Value.
Expand Down
151 changes: 151 additions & 0 deletions pkg/chunk/encoding/prometheus_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package encoding

import (
"io"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Wrapper around Prometheus chunk.
type prometheusXorChunk struct {
chunk chunkenc.Chunk
}

func newPrometheusXorChunk() *prometheusXorChunk {
return &prometheusXorChunk{}
}

// Add adds another sample to the chunk. While Add works, it is only implemented
// to make tests work, and should not be used in production. In particular, it appends
// all samples to single chunk, and uses new Appender for each Add.
func (p *prometheusXorChunk) Add(m model.SamplePair) (Chunk, error) {
if p.chunk == nil {
p.chunk = chunkenc.NewXORChunk()
}

app, err := p.chunk.Appender()
if err != nil {
return nil, err
}

app.Append(int64(m.Timestamp), float64(m.Value))
return nil, nil
}

func (p *prometheusXorChunk) NewIterator(iterator Iterator) Iterator {
if p.chunk == nil {
return errorIterator("Prometheus chunk is not set")
}

if pit, ok := iterator.(*prometheusChunkIterator); ok {
pit.c = p.chunk
pit.it = p.chunk.Iterator(pit.it)
return pit
}

return &prometheusChunkIterator{c: p.chunk, it: p.chunk.Iterator(nil)}
}

func (p *prometheusXorChunk) Marshal(i io.Writer) error {
if p.chunk == nil {
return errors.New("chunk data not set")
}
_, err := i.Write(p.chunk.Bytes())
return err
}

func (p *prometheusXorChunk) UnmarshalFromBuf(bytes []byte) error {
c, err := chunkenc.FromData(chunkenc.EncXOR, bytes)
if err != nil {
return errors.Wrap(err, "failed to create Prometheus chunk from bytes")
}

p.chunk = c
return nil
}

func (p *prometheusXorChunk) Encoding() Encoding {
return PrometheusXorChunk
}

func (p *prometheusXorChunk) Utilization() float64 {
// Used for reporting when chunk is used to store new data.
return 0
}

func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk {
return p
}

func (p *prometheusXorChunk) Rebound(from, to model.Time) (Chunk, error) {
return nil, errors.New("Rebound not supported by PrometheusXorChunk")
}

func (p *prometheusXorChunk) Len() int {
if p.chunk == nil {
return 0
}
return p.chunk.NumSamples()
}

func (p *prometheusXorChunk) Size() int {
if p.chunk == nil {
return 0
}
return len(p.chunk.Bytes())
}

type prometheusChunkIterator struct {
c chunkenc.Chunk // we need chunk, because FindAtOrAfter needs to start with fresh iterator.
it chunkenc.Iterator
}

func (p *prometheusChunkIterator) Scan() bool {
return p.it.Next()
}

func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
// otherwise we cannot guarantee OLDEST.
p.it = p.c.Iterator(p.it)
return p.it.Seek(int64(time))
}

func (p *prometheusChunkIterator) Value() model.SamplePair {
ts, val := p.it.At()
return model.SamplePair{
Timestamp: model.Time(ts),
Value: model.SampleValue(val),
}
}

func (p *prometheusChunkIterator) Batch(size int) Batch {
var batch Batch
j := 0
for j < size {
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
j++
if j < size && !p.it.Next() {
break
}
}
batch.Index = 0
batch.Length = j
return batch
}

func (p *prometheusChunkIterator) Err() error {
return p.it.Err()
}

type errorIterator string

func (e errorIterator) Scan() bool { return false }
func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false }
func (e errorIterator) Value() model.SamplePair { panic("no values") }
func (e errorIterator) Batch(size int) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
26 changes: 26 additions & 0 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
Expand All @@ -24,6 +25,8 @@ type runtimeConfigValues struct {
TenantLimits map[string]*validation.Limits `yaml:"overrides"`

Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`

IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`
}

// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
Expand Down Expand Up @@ -98,6 +101,29 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch
return outCh
}
}

func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType {
if manager == nil {
return nil
}

return func() ingester.QueryStreamType {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
if cfg.IngesterChunkStreaming == nil {
return ingester.QueryStreamDefault
}

if *cfg.IngesterChunkStreaming {
return ingester.QueryStreamChunks
}
return ingester.QueryStreamSamples
}

return ingester.QueryStreamDefault
}
}

func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ type Config struct {
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`

// Use blocks storage.
BlocksStorageEnabled bool `yaml:"-"`
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
BlocksStorageEnabled bool `yaml:"-"`
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
StreamChunksWhenUsingBlocks bool `yaml:"-"`
// Runtime-override for type of streaming query to use (chunks or samples).
StreamTypeFn func() QueryStreamType `yaml:"-"`

// Injected at runtime and read from the distributor config, required
// to accurately apply global limits.
Expand Down Expand Up @@ -114,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", false, "Enable tracking of active series and export them as metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")
}

// Ingester deals with "in flight" chunks. Based on Prometheus 1.x
Expand Down
Loading

0 comments on commit c333577

Please sign in to comment.