Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingester] Create one goroutine per tenant to flush traces to disk #4483

Merged
merged 13 commits into from
Jan 17, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott)
Fix an issue where the tempo-cli was not correctly dumping exemplar results.
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)
Expand Down
61 changes: 39 additions & 22 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@ const (
opKindFlush
)

// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
func (i *Ingester) Flush() {
Copy link
Member Author

@joe-elliott joe-elliott Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the old way that we flushed traces to disk on shutdown. it was difficult to find and it was driven through an obsolete ring mechanic "FlushTransfer" so I removed it and moved it to "stopping". I think the new way is more easily discoverable and clear.

https://github.com/grafana/tempo/pull/4483/files#diff-b17cd433ae9859f67f0056e452237ddf70a44e9821c6128c8990005eaf7decd1R176

instances := i.getInstances()

for _, instance := range instances {
err := instance.CutCompleteTraces(0, true)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut complete traces on shutdown", "err", err)
}
}
}

// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
Expand Down Expand Up @@ -124,9 +110,9 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
return
}
level.Info(log.Logger).Log("msg", "flushing instance", "instance", instance.instanceID)
i.sweepInstance(instance, true)
i.cutOneInstanceToWal(instance, true)
} else {
i.sweepAllInstances(true)
i.cutAllInstancesToWal()
Comment on lines +113 to +115
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed these funcs for clarity

}

w.WriteHeader(http.StatusNoContent)
Expand All @@ -151,16 +137,47 @@ func (o *flushOp) Priority() int64 {
return -o.at.Unix()
}

// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
// cutToWalLoop kicks off a goroutine for the passed instance that will periodically cut traces to WAL.
// it signals completion through cutToWalWg, waits for cutToWalStart and stops on cutToWalStop.
func (i *Ingester) cutToWalLoop(instance *instance) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new, per tenant loop, that drives flushing live traces to disk

i.cutToWalWg.Add(1)

go func() {
defer i.cutToWalWg.Done()

// wait for the signal to start. we need the wal to be completely replayed
// before we start cutting to WAL
select {
case <-i.cutToWalStart:
case <-i.cutToWalStop:
return
}

// ticker
ticker := time.NewTicker(i.cfg.FlushCheckPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
i.cutOneInstanceToWal(instance, false)
case <-i.cutToWalStop:
return
}
}
}()
}

// cutAllInstancesToWal periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) cutAllInstancesToWal() {
instances := i.getInstances()

for _, instance := range instances {
i.sweepInstance(instance, immediate)
i.cutOneInstanceToWal(instance, true)
}
}

func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
func (i *Ingester) cutOneInstanceToWal(instance *instance, immediate bool) {
// cut traces internally
err := instance.CutCompleteTraces(i.cfg.MaxTraceIdle, immediate)
if err != nil {
Expand Down Expand Up @@ -204,6 +221,7 @@ func (i *Ingester) flushLoop(j int) {
if o == nil {
return
}

op := o.(*flushOp)
op.attempts++

Expand Down Expand Up @@ -246,7 +264,7 @@ func handleAbandonedOp(op *flushOp) {
func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String())
withSpan(level.Info(log.Logger), sp).Log("msg", "completing block", "tenant", op.userID, "block", op.blockID.String())

// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
Expand All @@ -256,7 +274,6 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool,
}

start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was being logged twice and the the first was actually wrong. fixed and removed the second log

instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
Expand Down
86 changes: 42 additions & 44 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ type Ingester struct {
flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup

limiter Limiter
// manages synchronous behavior with startCutToWal
cutToWalWg sync.WaitGroup
cutToWalStop chan struct{}
cutToWalStart chan struct{}
limiter Limiter

// Used by ingest storage when enabled
ingestPartitionLifecycler *ring.PartitionInstanceLifecycler
Expand All @@ -97,13 +101,16 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
replayJitter: true,
overrides: overrides,

cutToWalStart: make(chan struct{}),
cutToWalStop: make(chan struct{}),
}

i.pushErr.Store(ErrStarting)

i.local = store.WAL().LocalBackend()

lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, nil, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer register ourselves as a "FlushTransferer"

https://github.com/grafana/dskit/blob/main/ring/lifecycler.go#L181

This is deprecated logic that we were only using to drive flush to disk on shutdown behavior. Removed in favor of just doing it clearly in the stopping func.

if err != nil {
return nil, fmt.Errorf("NewLifecycler failed: %w", err)
}
Expand Down Expand Up @@ -183,49 +190,38 @@ func (i *Ingester) starting(ctx context.Context) error {
}
}

// accept traces
i.pushErr.Store(nil)

// start flushing traces to wal
close(i.cutToWalStart)

return nil
}

func (i *Ingester) loop(ctx context.Context) error {
mapno marked this conversation as resolved.
Show resolved Hide resolved
flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.sweepAllInstances(false)

case <-ctx.Done():
return nil

case err := <-i.subservicesWatcher.Chan():
return fmt.Errorf("ingester subservice failed: %w", err)
}
}
}

// complete the flushing
// ExclusiveQueues.activekeys keeps track of flush operations due for processing
// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not
// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations
// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal
// This ensures that i.flushQueues is empty only when all traces are flushed
func (i *Ingester) flushRemaining() {
i.sweepAllInstances(true)
for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
select {
case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
return fmt.Errorf("ingester subservice failed: %w", err)
}
}

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
i.markUnavailable()

// flush any remaining traces
// signal all cutting to wal to stop and wait for all goroutines to finish
close(i.cutToWalStop)
i.cutToWalWg.Wait()

if i.cfg.FlushAllOnShutdown {
// force all in memory traces to be flushed to disk AND fully flush them to the backend
i.flushRemaining()
} else {
// force all in memory traces to be flushed to disk
i.cutAllInstancesToWal()
}

if i.flushQueues != nil {
Expand All @@ -238,6 +234,19 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

// complete the flushing
// ExclusiveQueues.activekeys keeps track of flush operations due for processing
// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not
// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations
// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal
// This ensures that i.flushQueues is empty only when all traces are flushed
func (i *Ingester) flushRemaining() {
i.cutAllInstancesToWal()
for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}
}

func (i *Ingester) markUnavailable() {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
Expand All @@ -248,7 +257,7 @@ func (i *Ingester) markUnavailable() {
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
i.pushErr.Store(ErrShuttingDown)
}

// PushBytes implements tempopb.Pusher.PushBytes. Traces pushed to this endpoint are expected to be in the formats
Expand Down Expand Up @@ -376,6 +385,8 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) {
return nil, err
}
i.instances[instanceID] = inst

i.cutToWalLoop(inst)
}
return inst, nil
}
Expand All @@ -399,19 +410,6 @@ func (i *Ingester) getInstances() []*instance {
return instances
}

// stopIncomingRequests implements ring.Lifecycler.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only called in one spot so removed

func (i *Ingester) stopIncomingRequests() {
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()

i.pushErr.Store(ErrShuttingDown)
}

// TransferOut implements ring.Lifecycler.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only existed to satisfy the FlushTransferer interface

func (i *Ingester) TransferOut(context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) replayWal() error {
level.Info(log.Logger).Log("msg", "beginning wal replay")

Expand Down