diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 43f1dc5a01a..24745a25cb9 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6650,22 +6650,42 @@ }, { "kind": "field", - "name": "fetch_concurrency", + "name": "startup_fetch_concurrency", "required": false, - "desc": "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.", + "desc": "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.", "fieldValue": null, - "fieldDefaultValue": 1, - "fieldFlag": "ingest-storage.kafka.fetch-concurrency", + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.startup-fetch-concurrency", "fieldType": "int" }, { "kind": "field", - "name": "records_per_fetch", + "name": "startup_records_per_fetch", "required": false, - "desc": "The number of records to fetch from Kafka in a single request.", + "desc": "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0.", "fieldValue": null, - "fieldDefaultValue": 128, - "fieldFlag": "ingest-storage.kafka.records-per-fetch", + "fieldDefaultValue": 2500, + "fieldFlag": "ingest-storage.kafka.startup-records-per-fetch", + "fieldType": "int" + }, + { + "kind": "field", + "name": "ongoing_fetch_concurrency", + "required": false, + "desc": "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.ongoing-fetch-concurrency", + "fieldType": "int" + }, + { + "kind": "field", + "name": "ongoing_records_per_fetch", + "required": false, + "desc": "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 30, + "fieldFlag": "ingest-storage.kafka.ongoing-records-per-fetch", "fieldType": "int" }, { diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 312c4e6aaad..fc15062fa11 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1363,8 +1363,6 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) - -ingest-storage.kafka.fetch-concurrency int - The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1) -ingest-storage.kafka.ingestion-concurrency int The number of concurrent ingestion streams to the TSDB head. 0 to disable. -ingest-storage.kafka.ingestion-concurrency-batch-size int @@ -1375,12 +1373,18 @@ Usage of ./cmd/mimir/mimir: How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + -ingest-storage.kafka.ongoing-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + -ingest-storage.kafka.ongoing-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) - -ingest-storage.kafka.records-per-fetch int - The number of records to fetch from Kafka in a single request. (default 128) + -ingest-storage.kafka.startup-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable. + -ingest-storage.kafka.startup-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500) -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index 2cb5eafa456..0be12694cab 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -433,8 +433,6 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) - -ingest-storage.kafka.fetch-concurrency int - The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup. (default 1) -ingest-storage.kafka.ingestion-concurrency int The number of concurrent ingestion streams to the TSDB head. 0 to disable. -ingest-storage.kafka.ingestion-concurrency-batch-size int @@ -445,12 +443,18 @@ Usage of ./cmd/mimir/mimir: How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + -ingest-storage.kafka.ongoing-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be greater than 0. + -ingest-storage.kafka.ongoing-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. (default 30) -ingest-storage.kafka.producer-max-buffered-bytes int The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) - -ingest-storage.kafka.records-per-fetch int - The number of records to fetch from Kafka in a single request. (default 128) + -ingest-storage.kafka.startup-fetch-concurrency int + The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable. + -ingest-storage.kafka.startup-records-per-fetch int + The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on ingest-storage.kafka.startup-fetch-concurrency being greater than 0. (default 2500) -ingest-storage.kafka.target-consumer-lag-at-startup duration The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 9f52289edf1..725e183ded4 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3860,14 +3860,29 @@ kafka: # CLI flag: -ingest-storage.kafka.wait-strong-read-consistency-timeout [wait_strong_read_consistency_timeout: | default = 20s] - # The number of concurrent fetch requests that the ingester sends to Kafka - # when catching up during startup. - # CLI flag: -ingest-storage.kafka.fetch-concurrency - [fetch_concurrency: | default = 1] - - # The number of records to fetch from Kafka in a single request. - # CLI flag: -ingest-storage.kafka.records-per-fetch - [records_per_fetch: | default = 128] + # The number of concurrent fetch requests that the ingester makes when reading + # data from Kafka during startup. 0 to disable. + # CLI flag: -ingest-storage.kafka.startup-fetch-concurrency + [startup_fetch_concurrency: | default = 0] + + # The number of records per fetch request that the ingester makes when reading + # data from Kafka during startup. Depends on + # ingest-storage.kafka.startup-fetch-concurrency being greater than 0. + # CLI flag: -ingest-storage.kafka.startup-records-per-fetch + [startup_records_per_fetch: | default = 2500] + + # The number of concurrent fetch requests that the ingester makes when reading + # data continuously from Kafka after startup. Is disabled unless + # ingest-storage.kafka.startup-fetch-concurrency is greater than 0. It must be + # greater than 0. + # CLI flag: -ingest-storage.kafka.ongoing-fetch-concurrency + [ongoing_fetch_concurrency: | default = 0] + + # The number of records per fetch request that the ingester makes when reading + # data continuously from Kafka after startup. Depends on + # ingest-storage.kafka.ongoing-fetch-concurrency being greater than 0. + # CLI flag: -ingest-storage.kafka.ongoing-records-per-fetch + [ongoing_records_per_fetch: | default = 30] # When enabled, the fetch request MaxBytes field is computed using the # compressed size of previous records. When disabled, MaxBytes is computed diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 4fe14e80317..3a23d4c1faf 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -93,8 +93,11 @@ type KafkaConfig struct { // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. FallbackClientErrorSampleRate int64 `yaml:"-"` - FetchConcurrency int `yaml:"fetch_concurrency"` - RecordsPerFetch int `yaml:"records_per_fetch"` + StartupFetchConcurrency int `yaml:"startup_fetch_concurrency"` + StartupRecordsPerFetch int `yaml:"startup_records_per_fetch"` + OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"` + OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"` + UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` IngestionConcurrency int `yaml:"ingestion_concurrency"` IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"` @@ -132,8 +135,12 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.") - f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.") - f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.") + + f.IntVar(&cfg.StartupFetchConcurrency, prefix+".startup-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data from Kafka during startup. 0 to disable.") + f.IntVar(&cfg.StartupRecordsPerFetch, prefix+".startup-records-per-fetch", 2500, "The number of records per fetch request that the ingester makes when reading data from Kafka during startup. Depends on "+prefix+".startup-fetch-concurrency being greater than 0.") + f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.") + f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.") + f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.") f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.") f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 128, "The number of timeseries to batch together before ingesting into TSDB. This is only used when ingestion-concurrency is greater than 0.") @@ -172,6 +179,18 @@ func (cfg *KafkaConfig) Validate() error { return ErrInvalidMaxConsumerLagAtStartup } + if cfg.StartupFetchConcurrency < 0 { + return fmt.Errorf("ingest-storage.kafka.startup-fetch-concurrency must be greater or equal to 0") + } + + if cfg.StartupFetchConcurrency > 0 && cfg.OngoingFetchConcurrency <= 0 { + return fmt.Errorf("ingest-storage.kafka.ongoing-fetch-concurrency must be greater than 0 when startup-fetch-concurrency is greater than 0") + } + + if cfg.StartupRecordsPerFetch <= 0 || cfg.OngoingRecordsPerFetch <= 0 { + return fmt.Errorf("ingest-storage.kafka.startup-records-per-fetch and ingest-storage.kafka.ongoing-records-per-fetch must be greater than 0") + } + return nil } diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 7f0aa633b08..4be9840ebd4 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -65,7 +65,14 @@ type recordConsumer interface { } type fetcher interface { - pollFetches(context.Context) (kgo.Fetches, context.Context) + // PollFetches fetches records from Kafka and returns them. + PollFetches(context.Context) (kgo.Fetches, context.Context) + + // Update updates the fetcher with the given concurrency and records. + Update(ctx context.Context, concurrency, records int) + + // Stop stops the fetcher. + Stop() } type PartitionReader struct { @@ -94,6 +101,14 @@ type PartitionReader struct { reg prometheus.Registerer } +func (r *PartitionReader) Stop() { + // Given the partition reader has no concurrency it doesn't support stopping anything. +} + +func (r *PartitionReader) Update(_ context.Context, _, _ int) { + // Given the partition reader has no concurrency it doesn't support updates. +} + type consumerFactoryFunc func() recordConsumer func (c consumerFactoryFunc) consumer() recordConsumer { @@ -179,11 +194,12 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { return errors.Wrap(err, "starting service manager") } - if r.kafkaCfg.FetchConcurrency > 1 { - r.fetcher, err = newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.FetchConcurrency, r.kafkaCfg.RecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) + if r.kafkaCfg.StartupFetchConcurrency > 0 { + f, err := newConcurrentFetchers(ctx, r.client, r.logger, r.kafkaCfg.Topic, r.partitionID, startOffset, r.kafkaCfg.StartupFetchConcurrency, r.kafkaCfg.StartupRecordsPerFetch, r.kafkaCfg.UseCompressedBytesAsFetchMaxBytes, r.concurrentFetchersMinBytesMaxWaitTime, offsetsClient, startOffsetReader, &r.metrics) if err != nil { - return errors.Wrap(err, "creating concurrent fetchers") + return errors.Wrap(err, "creating concurrent fetchers during startup") } + r.fetcher = f } else { r.fetcher = r } @@ -215,6 +231,10 @@ func (r *PartitionReader) stopDependencies() error { } } + if r.fetcher != nil { + r.fetcher.Stop() + } + if r.client != nil { r.client.Close() } @@ -223,6 +243,8 @@ func (r *PartitionReader) stopDependencies() error { } func (r *PartitionReader) run(ctx context.Context) error { + r.fetcher.Update(ctx, r.kafkaCfg.OngoingFetchConcurrency, r.kafkaCfg.OngoingRecordsPerFetch) + for ctx.Err() == nil { err := r.processNextFetches(ctx, r.metrics.receiveDelayWhenRunning) if err != nil && !errors.Is(err, context.Canceled) { @@ -235,7 +257,7 @@ func (r *PartitionReader) run(ctx context.Context) error { } func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) error { - fetches, fetchCtx := r.fetcher.pollFetches(ctx) + fetches, fetchCtx := r.fetcher.PollFetches(ctx) // Propagate the fetching span to consuming the records. ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(fetchCtx)) r.recordFetchesMetrics(fetches, delayObserver) @@ -714,7 +736,7 @@ func (r *PartitionReader) waitReadConsistency(ctx context.Context, withOffset bo return err } -func (r *PartitionReader) pollFetches(ctx context.Context) (result kgo.Fetches, ctx2 context.Context) { +func (r *PartitionReader) PollFetches(ctx context.Context) (result kgo.Fetches, ctx2 context.Context) { defer func(start time.Time) { r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) }(time.Now()) @@ -878,6 +900,9 @@ func newEmptyFetchResult(ctx context.Context, err error) fetchResult { } type concurrentFetchers struct { + wg sync.WaitGroup + done chan struct{} + client *kgo.Client logger log.Logger partitionID int32 @@ -886,8 +911,6 @@ type concurrentFetchers struct { metrics *readerMetrics tracer *kotel.Tracer - concurrency int - recordsPerFetch int minBytesWaitTime time.Duration orderedFetches chan fetchResult @@ -898,6 +921,12 @@ type concurrentFetchers struct { trackCompressedBytes bool } +func (r *concurrentFetchers) Stop() { + close(r.done) + + r.wg.Wait() +} + // newConcurrentFetchers creates a new concurrentFetchers. startOffset can be kafkaOffsetStart, kafkaOffsetEnd or a specific offset. func newConcurrentFetchers( ctx context.Context, @@ -916,20 +945,20 @@ func newConcurrentFetchers( ) (*concurrentFetchers, error) { const noReturnedRecords = -1 // we still haven't returned the 0 offset. + f := &concurrentFetchers{ client: client, logger: logger, - concurrency: concurrency, topicName: topic, partitionID: partition, metrics: metrics, - recordsPerFetch: recordsPerFetch, minBytesWaitTime: minBytesWaitTime, lastReturnedRecord: noReturnedRecords, startOffsets: startOffsetsReader, trackCompressedBytes: trackCompressedBytes, tracer: recordsTracer(), orderedFetches: make(chan fetchResult), + done: make(chan struct{}), } var err error @@ -955,12 +984,19 @@ func newConcurrentFetchers( } f.topicID = topics[topic].ID - go f.runFetchers(ctx, startOffset) + go f.start(ctx, startOffset, concurrency, recordsPerFetch) return f, nil } -func (r *concurrentFetchers) pollFetches(ctx context.Context) (kgo.Fetches, context.Context) { +func (r *concurrentFetchers) Update(ctx context.Context, concurrency, records int) { + r.Stop() + r.done = make(chan struct{}) + + go r.start(ctx, r.lastReturnedRecord, concurrency, records) +} + +func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, context.Context) { waitStartTime := time.Now() select { case <-ctx.Done(): @@ -1129,8 +1165,9 @@ func sumRecordLengths(records []*kgo.Record) (sum int) { return sum } -func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.WaitGroup, wants chan fetchWant, logger log.Logger) { - defer fetchersWg.Done() +func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logger log.Logger) { + defer r.wg.Done() + errBackoff := backoff.New(ctx, backoff.Config{ MinBackoff: 250 * time.Millisecond, MaxBackoff: 2 * time.Second, @@ -1154,10 +1191,21 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa if f.Err != nil { w = handleKafkaFetchErr(f.Err, w, errBackoff, r.startOffsets, r.client, attemptSpan) } + if len(f.Records) == 0 { // Typically if we had an error, then there wouldn't be any records. // But it's hard to verify this for all errors from the Kafka API docs, so just to be sure, we process any records we might have received. attemptSpan.Finish() + + // There is a chance we've been told to stop even when we have no records. + select { + case <-r.done: + wantSpan.Finish() + close(w.result) + return + default: + } + continue } // Next attempt will be from the last record onwards. @@ -1168,6 +1216,11 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa errBackoff.Reset() select { + case <-r.done: + wantSpan.Finish() + attemptSpan.Finish() + close(w.result) + return case w.result <- f: previousResult = fetchResult{} case <-ctx.Done(): @@ -1176,6 +1229,11 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa // We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now. f.startWaitingForConsumption() select { + case <-r.done: + wantSpan.Finish() + attemptSpan.Finish() + close(w.result) + return case w.result <- f: previousResult = fetchResult{} case <-ctx.Done(): @@ -1189,20 +1247,18 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa } } -func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) { - fetchersWg := &sync.WaitGroup{} - fetchersWg.Add(r.concurrency) - defer fetchersWg.Wait() +func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) { + r.wg.Add(concurrency) wants := make(chan fetchWant) defer close(wants) - for i := 0; i < r.concurrency; i++ { + for i := 0; i < concurrency; i++ { logger := log.With(r.logger, "fetcher", i) - go r.runFetcher(ctx, fetchersWg, wants, logger) + go r.run(ctx, wants, logger) } var ( - nextFetch = fetchWantFrom(startOffset, r.recordsPerFetch) + nextFetch = fetchWantFrom(startOffset, recordsPerFetch) nextResult chan fetchResult pendingResults = list.New() @@ -1211,6 +1267,9 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) ) nextFetch.bytesPerRecord = 10_000 // start with an estimation, we will update it as we consume + // We need to make sure we don't leak any goroutine given that start is called within a goroutine. + r.wg.Add(1) + defer r.wg.Done() for { refillBufferedResult := nextResult if readyBufferedResults != nil { @@ -1219,6 +1278,8 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) refillBufferedResult = nil } select { + case <-r.done: + return case <-ctx.Done(): return @@ -1229,7 +1290,7 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64) nextResult = pendingResults.Front().Value.(chan fetchResult) pendingResults.Remove(pendingResults.Front()) } - nextFetch = nextFetch.Next(r.recordsPerFetch) + nextFetch = nextFetch.Next(recordsPerFetch) case result, moreLeft := <-refillBufferedResult: if !moreLeft { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 7a7725246cc..4e4669e8dbd 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -2159,7 +2159,7 @@ func TestConcurrentFetchers(t *testing.T) { fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) // This should not block forever now - fetches, fetchCtx := fetchers.pollFetches(ctx) + fetches, fetchCtx := fetchers.PollFetches(ctx) assert.Zero(t, fetches.NumRecords()) assert.Error(t, fetchCtx.Err(), "Expected context to be cancelled") @@ -2179,7 +2179,7 @@ func TestConcurrentFetchers(t *testing.T) { fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 5) }) @@ -2197,7 +2197,7 @@ func TestConcurrentFetchers(t *testing.T) { produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) } - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) assert.Equal(t, fetches.NumRecords(), 3) }) @@ -2221,7 +2221,7 @@ func TestConcurrentFetchers(t *testing.T) { defer wg.Done() consumedRecords := 0 for consumedRecords < 10 { - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) time.Sleep(1000 * time.Millisecond) // Simulate slow processing consumedRecords += fetches.NumRecords() } @@ -2256,7 +2256,7 @@ func TestConcurrentFetchers(t *testing.T) { defer wg.Done() consumedRecords := 0 for consumedRecords < 10 { - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) consumedRecords += fetches.NumRecords() // no processing delay } @@ -2284,7 +2284,7 @@ func TestConcurrentFetchers(t *testing.T) { var totalRecords int for totalRecords < 20 { - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) totalRecords += fetches.NumRecords() } @@ -2319,7 +2319,7 @@ func TestConcurrentFetchers(t *testing.T) { const expectedRecords = 5 fetchedRecordsContents := make([]string, 0, expectedRecords) for len(fetchedRecordsContents) < expectedRecords { - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) fetches.EachRecord(func(r *kgo.Record) { fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value)) }) @@ -2358,7 +2358,7 @@ func TestConcurrentFetchers(t *testing.T) { // Poll for fetches and verify fetchedRecords := make([]string, 0, recordsPerRound) for len(fetchedRecords) < recordsPerRound { - fetches, _ := fetchers.pollFetches(ctx) + fetches, _ := fetchers.PollFetches(ctx) fetches.EachRecord(func(r *kgo.Record) { fetchedRecords = append(fetchedRecords, string(r.Value)) t.Log("fetched", r.Offset, string(r.Value)) @@ -2370,6 +2370,46 @@ func TestConcurrentFetchers(t *testing.T) { } }) + t.Run("concurrency can be updated", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rec1 := []byte("record-1") + rec2 := []byte("record-2") + rec3 := []byte("record-3") + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + produceRecordAndAssert := func(record []byte) { + producedOffset := produceRecord(ctx, t, client, topicName, partitionID, record) + // verify that the record is fetched. + + var fetches kgo.Fetches + require.Eventually(t, func() bool { + fetches, _ = fetchers.PollFetches(ctx) + return len(fetches.Records()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + require.Equal(t, fetches.Records()[0].Value, record) + require.Equal(t, fetches.Records()[0].Offset, producedOffset) + } + + // Ensure that the fetchers work with the initial concurrency. + produceRecordAndAssert(rec1) + + // Now, update the concurrency. + fetchers.Update(ctx, 1, 1) + + // Ensure that the fetchers work with the updated concurrency. + produceRecordAndAssert(rec2) + + // Update and verify again. + fetchers.Update(ctx, 10, 10) + produceRecordAndAssert(rec3) + + }) + } func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index 83aba4b2a4b..63254fe0a33 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -6,7 +6,7 @@ import ( "context" "errors" "fmt" - "math/rand/v2" + randv2 "math/rand/v2" "strings" "sync" "testing" @@ -796,7 +796,7 @@ func TestWriter_WriteSync_HighConcurrencyOnKafkaClientBufferFull(t *testing.T) { createRandomWriteRequest := func() *mimirpb.WriteRequest { // It's important that each request has a different size to reproduce the deadlock. - metricName := strings.Repeat("x", rand.IntN(1000)) + metricName := strings.Repeat("x", randv2.IntN(1000)) series := []mimirpb.PreallocTimeseries{mockPreallocTimeseries(metricName)} return &mimirpb.WriteRequest{Timeseries: series, Metadata: nil, Source: mimirpb.API} @@ -814,7 +814,7 @@ func TestWriter_WriteSync_HighConcurrencyOnKafkaClientBufferFull(t *testing.T) { // Throttle a very short (random) time to increase chances of hitting race conditions. cluster.ControlKey(int16(kmsg.Produce), func(_ kmsg.Request) (kmsg.Response, error, bool) { - time.Sleep(time.Duration(rand.Int64N(int64(time.Millisecond)))) + time.Sleep(time.Duration(randv2.Int64N(int64(time.Millisecond)))) return nil, nil, false }) @@ -1098,8 +1098,10 @@ func createTestKafkaConfig(clusterAddr, topicName string) KafkaConfig { cfg.Address = clusterAddr cfg.Topic = topicName cfg.WriteTimeout = 2 * time.Second - cfg.FetchConcurrency = 2 - cfg.RecordsPerFetch = 2 + cfg.StartupRecordsPerFetch = 2 + cfg.StartupRecordsPerFetch = 2 + cfg.OngoingRecordsPerFetch = 2 + cfg.OngoingFetchConcurrency = 2 return cfg }