Skip to content

Commit

Permalink
Make concurrentFetchers change its concurrency dynamically (#9437)
Browse files Browse the repository at this point in the history
* Make concurrentFetchers change its concurrency dynamically

Signed-off-by: gotjosh <[email protected]>

* address review comments

Signed-off-by: gotjosh <[email protected]>

* `make doc`

Signed-off-by: gotjosh <[email protected]>

* inline the stop method

Signed-off-by: gotjosh <[email protected]>

* Fix panic when creating concurrent fetchers fails

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Disabled by default

Signed-off-by: gotjosh <[email protected]>

* we don't need to handle the context in start

Signed-off-by: gotjosh <[email protected]>

* don't store concurrency or records per fetch

Signed-off-by: gotjosh <[email protected]>

* add validation to the flags

Signed-off-by: gotjosh <[email protected]>

* Ensure we don't leak any goroutines.

Signed-off-by: gotjosh <[email protected]>

* remove concurrent and recordsperfetch from the main struct

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: gotjosh <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
gotjosh and dimitarvdimitrov authored Sep 26, 2024
1 parent 861563d commit e74927d
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 63 deletions.
36 changes: 28 additions & 8 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6650,22 +6650,42 @@
},
{
"kind": "field",
"name": "fetch_concurrency",
"name": "startup_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.",
"desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 1,
"fieldFlag": "ingest-storage.kafka.fetch-concurrency",
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "records_per_fetch",
"name": "startup_records_per_fetch",
"required": false,
"desc": "The number of records to fetch from Kafka in a single request.",
"desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "ingest-storage.kafka.records-per-fetch",
"fieldDefaultValue": 2500,
"fieldFlag": "ingest-storage.kafka.startup-records-per-fetch",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_fetch_concurrency",
"required": false,
"desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency",
"fieldType": "int"
},
{
"kind": "field",
"name": "ongoing_records_per_fetch",
"required": false,
"desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.",
"fieldValue": null,
"fieldDefaultValue": 30,
"fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch",
"fieldType": "int"
},
{
Expand Down
12 changes: 8 additions & 4 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,6 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
Expand All @@ -1375,12 +1373,18 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
12 changes: 8 additions & 4 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,6 @@ Usage of ./cmd/mimir/mimir:
How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s)
-ingest-storage.kafka.dial-timeout duration
The maximum time allowed to open a connection to a Kafka broker. (default 2s)
-ingest-storage.kafka.fetch-concurrency int
The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1)
-ingest-storage.kafka.ingestion-concurrency int
The number of concurrent ingestion streams to the TSDB head. 0 to disable.
-ingest-storage.kafka.ingestion-concurrency-batch-size int
Expand All @@ -445,12 +443,18 @@ Usage of ./cmd/mimir/mimir:
How long to retry a failed request to get the last produced offset. (default 10s)
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.ongoing-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.
-ingest-storage.kafka.ongoing-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.records-per-fetch int
The number of records to fetch from Kafka in a single request. (default 128)
-ingest-storage.kafka.startup-fetch-concurrency int
The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.
-ingest-storage.kafka.startup-records-per-fetch int
The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s)
-ingest-storage.kafka.topic string
Expand Down
31 changes: 23 additions & 8 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3860,14 +3860,29 @@ kafka:
# CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout
[wait_strong_read_consistency_timeout: <duration> | default = 20s]
# The number of concurrent fetch requests that the ingester sends to Kafka
# when catching up during startup.
# CLI flag: -ingest-storage.kafka.fetch-concurrency
[fetch_concurrency: <int> | default = 1]
# The number of records to fetch from Kafka in a single request.
# CLI flag: -ingest-storage.kafka.records-per-fetch
[records_per_fetch: <int> | default = 128]
# The number of concurrent fetch requests that the ingester makes when reading
# data from Kafka during startup. 0 to disable.
# CLI flag: -ingest-storage.kafka.startup-fetch-concurrency
[startup_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data from Kafka during startup. Depends on
# ingest-storage.kafka.startup-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.startup-records-per-fetch
[startup_records_per_fetch: <int> | default = 2500]
# The number of concurrent fetch requests that the ingester makes when reading
# data continuously from Kafka after startup. Is disabled unless
# ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be
# greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency
[ongoing_fetch_concurrency: <int> | default = 0]
# The number of records per fetch request that the ingester makes when reading
# data continuously from Kafka after startup. Depends on
# ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.
# CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch
[ongoing_records_per_fetch: <int> | default = 30]
# When enabled, the fetch request MaxBytes field is computed using the
# compressed size of previous records. When disabled, MaxBytes is computed
Expand Down
27 changes: 23 additions & 4 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ type KafkaConfig struct {
// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

FetchConcurrency int `yaml:"fetch_concurrency"`
RecordsPerFetch int `yaml:"records_per_fetch"`
StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"`
StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"`
OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"`
OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"`

UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
IngestionConcurrency int `yaml:"ingestion_concurrency"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
Expand Down Expand Up @@ -132,8 +135,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.")
f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.")

f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.")
f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.")
f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.")
f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.")

f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.")
f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.")
Expand Down Expand Up @@ -172,6 +179,18 @@ func (cfg *KafkaConfig) Validate() error {
return ErrInvalidMaxConsumerLagAtStartup
}

if cfg.StartupFetchConcurrency < 0 {
return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0")
}

if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 {
return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0")
}

if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 {
return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0")
}

return nil
}

Expand Down
Loading

0 comments on commit e74927d

Please sign in to comment.