Skip to content

Commit a9345d0

Browse files
authored
chore(blooms): Clean up settings and make reasonable defaults (#12483)
Signed-off-by: Christian Haudum <[email protected]>
1 parent f9350d6 commit a9345d0

File tree

11 files changed

+196
-170
lines changed

11 files changed

+196
-170
lines changed

docs/sources/configure/_index.md

+138-123
Large diffs are not rendered by default.

pkg/bloomcompactor/bloomcompactor.go

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/grafana/loki/v3/pkg/storage/config"
2323
"github.com/grafana/loki/v3/pkg/storage/stores"
2424
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
25+
utillog "github.com/grafana/loki/v3/pkg/util/log"
2526
util_ring "github.com/grafana/loki/v3/pkg/util/ring"
2627
)
2728

@@ -72,6 +73,7 @@ func New(
7273
logger log.Logger,
7374
r prometheus.Registerer,
7475
) (*Compactor, error) {
76+
utillog.WarnExperimentalUse("Bloom Compactor", logger)
7577
c := &Compactor{
7678
cfg: cfg,
7779
schemaCfg: schemaCfg,

pkg/bloomcompactor/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6868
}
6969

7070
func (cfg *Config) Validate() error {
71+
if !cfg.Enabled {
72+
return nil
73+
}
74+
7175
if err := cfg.RetentionConfig.Validate(); err != nil {
7276
return err
7377
}

pkg/bloomgateway/bloomgateway.go

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import (
6262
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
6363
"github.com/grafana/loki/v3/pkg/util"
6464
"github.com/grafana/loki/v3/pkg/util/constants"
65+
utillog "github.com/grafana/loki/v3/pkg/util/log"
6566
)
6667

6768
var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
@@ -108,6 +109,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
108109

109110
// New returns a new instance of the Bloom Gateway.
110111
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
112+
utillog.WarnExperimentalUse("Bloom Gateway", logger)
111113
g := &Gateway{
112114
cfg: cfg,
113115
logger: logger,

pkg/bloomgateway/config.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,27 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2525
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
2626
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2727
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
28-
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
29-
f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 4, "Number of blocks processed concurrently on a single worker.")
28+
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently. Usually set to 1x number of CPU cores.")
29+
f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 8, "Number of blocks processed concurrently on a single worker. Usually set to 2x number of CPU cores.")
3030
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
3131
f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.")
3232
// TODO(chaudum): Figure out what the better place is for registering flags
3333
// -bloom-gateway.client.* or -bloom-gateway-client.*
3434
cfg.Client.RegisterFlags(f)
3535
}
3636

37+
func (cfg *Config) Validate() error {
38+
if !cfg.Enabled {
39+
return nil
40+
}
41+
if err := cfg.Client.Validate(); err != nil {
42+
return err
43+
}
44+
return nil
45+
}
46+
3747
type Limits interface {
3848
CacheLimits
3949
BloomGatewayShardSize(tenantID string) int
4050
BloomGatewayEnabled(tenantID string) bool
41-
BloomGatewayBlocksDownloadingParallelism(tenantID string) int
4251
}

pkg/loki/loki.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ type Config struct {
9090
Ingester ingester.Config `yaml:"ingester,omitempty"`
9191
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
9292
IndexGateway indexgateway.Config `yaml:"index_gateway"`
93-
BloomCompactor bloomcompactor.Config `yaml:"bloom_compactor"`
94-
BloomGateway bloomgateway.Config `yaml:"bloom_gateway"`
93+
BloomCompactor bloomcompactor.Config `yaml:"bloom_compactor,omitempty" category:"experimental"`
94+
BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"`
9595
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
9696
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
9797
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
@@ -269,6 +269,9 @@ func (c *Config) Validate() error {
269269
if err := c.BloomCompactor.Validate(); err != nil {
270270
return errors.Wrap(err, "invalid bloom_compactor config")
271271
}
272+
if err := c.BloomGateway.Validate(); err != nil {
273+
return errors.Wrap(err, "invalid bloom_gateway config")
274+
}
272275

273276
if err := c.Pattern.Validate(); err != nil {
274277
return errors.Wrap(err, "invalid pattern_ingester config")

pkg/storage/factory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ type Config struct {
336336
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
337337
BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
338338
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
339-
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures the bloom shipper component, which contains the store abstraction to fetch bloom filters from and put them to object storage."`
339+
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" category:"experimental" doc:"description=Experimental: Configures the bloom shipper component, which contains the store abstraction to fetch bloom filters from and put them to object storage."`
340340

341341
// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
342342
// It is required for getting chunk ids of recently flushed chunks from the ingesters.

pkg/storage/stores/shipper/bloomshipper/config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
2828
f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.")
2929
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
3030
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
31-
f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.")
31+
f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 8, "The amount of maximum concurrent bloom blocks downloads. Usually set to 2x number of CPU cores.")
3232
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
3333
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
3434

pkg/storage/stores/shipper/bloomshipper/shipper.go

-4
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ type Shipper struct {
1919
store Store
2020
}
2121

22-
type Limits interface {
23-
BloomGatewayBlocksDownloadingParallelism(tenantID string) int
24-
}
25-
2622
func NewShipper(client Store) *Shipper {
2723
return &Shipper{store: client}
2824
}

pkg/validation/limits.go

+23-28
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,18 @@ type Limits struct {
194194

195195
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
196196

197-
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
198-
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`
199-
200-
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
201-
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
202-
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
203-
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
204-
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
205-
BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding"`
206-
BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"`
207-
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"`
208-
BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size"`
197+
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size" category:"experimental"`
198+
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering" category:"experimental"`
199+
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval" category:"experimental"`
200+
201+
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size" category:"experimental"`
202+
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction" category:"experimental"`
203+
BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size" category:"experimental"`
204+
205+
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"`
206+
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"`
207+
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate" category:"experimental"`
208+
BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"`
209209

210210
AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
211211
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
@@ -358,21 +358,20 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
358358

359359
f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")
360360

361-
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 0, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
362-
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Whether to use the bloom gateway component in the read path to filter chunks.")
363-
364-
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
365-
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
366-
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
367-
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 1, "Skip factor for the n-grams created when computing blooms from log lines.")
368-
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
369-
f.StringVar(&l.BloomBlockEncoding, "bloom-compactor.block-encoding", "none", "Compression algorithm for bloom block pages.")
370-
f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.")
371-
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.")
361+
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 0, "Experimental. The shard size defines how many bloom gateways should be used by a tenant for querying.")
362+
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Experimental. Whether to use the bloom gateway component in the read path to filter chunks.")
363+
364+
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "Experimental. The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
365+
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Experimental. Whether to compact chunks into bloom filters.")
366+
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Experimental. Length of the n-grams created when computing blooms from log lines.")
367+
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 1, "Experimental. Skip factor for the n-grams created when computing blooms from log lines.")
368+
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Experimental. Scalable Bloom Filter desired false-positive rate.")
369+
f.StringVar(&l.BloomBlockEncoding, "bloom-compactor.block-encoding", "none", "Experimental. Compression algorithm for bloom block pages.")
370+
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Experimental. Interval for computing the cache key in the Bloom Gateway.")
372371
_ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize)
373372
f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size",
374373
fmt.Sprintf(
375-
"The maximum bloom block size. A value of 0 sets an unlimited size. Default is %s. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.",
374+
"Experimental. The maximum bloom block size. A value of 0 sets an unlimited size. Default is %s. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.",
376375
defaultBloomCompactorMaxBlockSize,
377376
),
378377
)
@@ -938,10 +937,6 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
938937
return o.getOverridesForUser(userID).BloomGatewayShardSize
939938
}
940939

941-
func (o *Overrides) BloomGatewayBlocksDownloadingParallelism(userID string) int {
942-
return o.getOverridesForUser(userID).BloomGatewayBlocksDownloadingParallelism
943-
}
944-
945940
func (o *Overrides) BloomGatewayCacheKeyInterval(userID string) time.Duration {
946941
return o.getOverridesForUser(userID).BloomGatewayCacheKeyInterval
947942
}

tools/doc-generator/parse/root_blocks.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,6 @@ var (
101101
StructType: []reflect.Type{reflect.TypeOf(indexgateway.Config{})},
102102
Desc: "The index_gateway block configures the Loki index gateway server, responsible for serving index queries without the need to constantly interact with the object store.",
103103
},
104-
{
105-
Name: "bloom_gateway",
106-
StructType: []reflect.Type{reflect.TypeOf(bloomgateway.Config{})},
107-
Desc: "The bloom_gateway block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions.",
108-
},
109104
{
110105
Name: "storage_config",
111106
StructType: []reflect.Type{reflect.TypeOf(storage.Config{})},
@@ -129,7 +124,12 @@ var (
129124
{
130125
Name: "bloom_compactor",
131126
StructType: []reflect.Type{reflect.TypeOf(bloomcompactor.Config{})},
132-
Desc: "The bloom_compactor block configures the Loki bloom compactor server, responsible for compacting stream indexes into bloom filters and merging them as bloom blocks",
127+
Desc: "Experimental: The bloom_compactor block configures the Loki bloom compactor server, responsible for compacting stream indexes into bloom filters and merging them as bloom blocks.",
128+
},
129+
{
130+
Name: "bloom_gateway",
131+
StructType: []reflect.Type{reflect.TypeOf(bloomgateway.Config{})},
132+
Desc: "Experimental: The bloom_gateway block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions.",
133133
},
134134
{
135135
Name: "limits_config",
@@ -197,7 +197,7 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
197197
{
198198
Name: "grpc_client",
199199
StructType: []reflect.Type{reflect.TypeOf(grpcclient.Config{})},
200-
Desc: "The grpc_client block configures the gRPC client used to communicate between two Loki components.",
200+
Desc: "The grpc_client block configures the gRPC client used to communicate between a client and server component in Loki.",
201201
},
202202
// TLS config
203203
{
@@ -209,7 +209,7 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
209209
{
210210
Name: "cache_config",
211211
StructType: []reflect.Type{reflect.TypeOf(cache.Config{})},
212-
Desc: "The cache block configures the cache backend.",
212+
Desc: "The cache_config block configures the cache backend for a specific Loki component.",
213213
},
214214
// Schema periodic config
215215
{

0 commit comments

Comments
 (0)