Skip to content

Commit

Permalink
Distributor: start remote timeout on first callback (#6972)
Browse files Browse the repository at this point in the history
* Distributor: start remote timeout on first callback

It may take a while to calculate which instance gets each one of the
series (see: grafana/dskit#454)

It isn't fair (and is useless) to reach the callback with no ttl.

This changes the code to start the timeout once the first callback is
called.

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Oleg Zaytsev <[email protected]>

* Refactor using sync.OnceValues

Signed-off-by: Oleg Zaytsev <[email protected]>

---------

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega authored Dec 21, 2023
1 parent 7ca78fd commit 324843d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Query-Frontend and Query-Scheduler: split tenant query request queues by query component with `query-frontend.additional-query-queue-dimensions-enabled` and `query-scheduler.additional-query-queue-dimensions-enabled`. #6772
* [ENHANCEMENT] Store-gateway: include more information about lazy index-header loading in traces. #6922
* [ENHANCEMENT] Distributor: support disabling metric relabel rules per-tenant via the flag `-distributor.metric-relabeling-enabled` or associated YAML. #6970
* [ENHANCEMENT] Distributor: `-distributor.remote-timeout` is now accounted from the first ingester push request being sent. #6972
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
27 changes: 19 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,8 +1305,17 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
// Get a subring if tenant has shuffle shard size configured.
subRing := d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))

// Use an independent context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), d.cfg.RemoteTimeout)
if d.cfg.WriteRequestsBufferPoolingEnabled {
slabPool := pool.NewFastReleasingSlabPool[byte](&d.writeRequestBytePool, writeRequestSlabPoolSize)
ctx = ingester_client.WithSlabPool(ctx, slabPool)
}

// Use an independent context to make sure all ingesters get samples even if we return early.
// It will still take a while to calculate which ingester gets which series,
// so we'll start the remote timeout once the first callback is called.
remoteRequestContext := sync.OnceValues(func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.WithoutCancel(ctx), d.cfg.RemoteTimeout)
})

// All tokens, stored in order: series, metadata.
keys := make([]uint32, len(seriesKeys)+len(metadataKeys))
Expand All @@ -1318,11 +1327,6 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
// so set this flag false and pass cleanup() to DoBatch.
cleanupInDefer = false

if d.cfg.WriteRequestsBufferPoolingEnabled {
slabPool := pool.NewFastReleasingSlabPool[byte](&d.writeRequestBytePool, writeRequestSlabPoolSize)
localCtx = ingester_client.WithSlabPool(localCtx, slabPool)
}

err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
Expand All @@ -1345,6 +1349,9 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
}
}

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx, _ := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, userID, ingester, timeseries, metadata, req.Source)
Expand All @@ -1358,7 +1365,11 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
},
ring.DoBatchOptions{
Cleanup: func() { pushReq.CleanUp(); cancel() },
Cleanup: func() {
pushReq.CleanUp()
_, cancel := remoteRequestContext()
cancel()
},
IsClientError: isClientError,
Go: d.ingesterDoBatchPushWorkers,
},
Expand Down

0 comments on commit 324843d

Please sign in to comment.