diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d204cdae72..d8a4c2c5b46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ defaults: ``` * [BUGFIX] Moved empty root span substitution from `querier` to `query-frontend`. [#2671](https://github.com/grafana/tempo/issues/2671) (@galalen) * [BUGFIX] Correctly propagate ingester errors on the query path [#2935](https://github.com/grafana/tempo/issues/2935) (@joe-elliott) +* [BUGFIX] Fix issue where ingester doesn't stop query after timeout [#3031](https://github.com/grafana/tempo/pull/3031) (@mdisibio) * [BUGFIX] Fix cases where empty filter {} wouldn't return expected results [#2498](https://github.com/grafana/tempo/issues/2498) (@mdisibio) # v2.2.3 / 2023-09-13 diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index abbbc363b80..58e253ab1e2 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -483,6 +483,7 @@ func defaultIngesterTestConfig() Config { nil, ) + cfg.FlushOpTimeout = 99999 * time.Hour cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxTraceIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 90a41b4c551..30068015da3 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -75,19 +75,14 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem // collect results from all the goroutines via sr.Results channel. // range loop will exit when sr.Results channel is closed. for result := range sr.Results() { - // exit early and Propagate error upstream - if sr.Error() != nil { - return nil, sr.Error() - } - - combiner.AddMetadata(result) if combiner.Count() >= maxResults { sr.Close() // signal pending workers to exit - break + continue } + + combiner.AddMetadata(result) } - // can happen when we have only error, and no results if sr.Error() != nil { return nil, sr.Error() } diff --git a/pkg/search/results.go b/pkg/search/results.go index 23f1b765f29..61317a742a3 100644 --- a/pkg/search/results.go +++ b/pkg/search/results.go @@ -61,6 +61,7 @@ func (sr *Results) AddResult(ctx context.Context, r *tempopb.TraceSearchMetadata func (sr *Results) SetError(err error) { if !errors.Is(err, common.ErrUnsupported) { // ignore common.Unsupported sr.error.Store(err) + sr.Close() } } diff --git a/pkg/search/results_test.go b/pkg/search/results_test.go index 8e568f7cfda..33b4d0ba22d 100644 --- a/pkg/search/results_test.go +++ b/pkg/search/results_test.go @@ -28,11 +28,14 @@ func TestResultsDoesNotRace(t *testing.T) { sr := NewResults() defer sr.Close() - for i := 0; i < 100; i++ { + workers := 10 + results := 10_000 + + for i := 0; i < workers; i++ { sr.StartWorker() go func() { defer sr.FinishWorker() - for j := 0; j < 10_000; j++ { + for j := 0; j < results; j++ { if sr.AddResult(ctx, &tempopb.TraceSearchMetadata{}) { break } @@ -46,28 +49,29 @@ func TestResultsDoesNotRace(t *testing.T) { sr.AllWorkersStarted() - var resultsCount int - var err error + resultsCount := 0 for range sr.Results() { - if sr.Error() != nil { - err = sr.Error() // capture err to assert below - break // exit early - } resultsCount++ } + // Check error after results channel is closed which + // means all workers have exited. + err := sr.Error() + if tc.error { require.Error(t, err) if tc.consumeResults { // in case of error, we will bail out early - require.NotEqual(t, 10_000_00, resultsCount) - // will read at-least something by the time we have first error - require.NotEqual(t, 0, resultsCount) + // and not all results are read + require.Less(t, resultsCount, workers*results) + // But we always get at least 1 result before + // the error + require.Greater(t, resultsCount, 0) } } else { require.NoError(t, err) if tc.consumeResults { - require.Equal(t, 10_000_00, resultsCount) + require.Equal(t, workers*results, resultsCount) } } }) diff --git a/tempodb/backend/local/local.go b/tempodb/backend/local/local.go index 38a7b11d05a..169288222a3 100644 --- a/tempodb/backend/local/local.go +++ b/tempodb/backend/local/local.go @@ -41,7 +41,11 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor, } // Write implements backend.Writer -func (rw *Backend) Write(_ context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error { +func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error { + if err := ctx.Err(); err != nil { + return err + } + blockFolder := rw.rootPath(keypath) err := os.MkdirAll(blockFolder, os.ModePerm) if err != nil { @@ -64,6 +68,10 @@ func (rw *Backend) Write(_ context.Context, name string, keypath backend.KeyPath // Append implements backend.Writer func (rw *Backend) Append(ctx context.Context, name string, keypath backend.KeyPath, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + span, _ := opentracing.StartSpanFromContext(ctx, "local.Append", opentracing.Tags{ "len": len(buffer), }) @@ -95,7 +103,11 @@ func (rw *Backend) Append(ctx context.Context, name string, keypath backend.KeyP } // CloseAppend implements backend.Writer -func (rw *Backend) CloseAppend(_ context.Context, tracker backend.AppendTracker) error { +func (rw *Backend) CloseAppend(ctx context.Context, tracker backend.AppendTracker) error { + if err := ctx.Err(); err != nil { + return err + } + if tracker == nil { return nil } @@ -104,13 +116,21 @@ func (rw *Backend) CloseAppend(_ context.Context, tracker backend.AppendTracker) return dst.Close() } -func (rw *Backend) Delete(_ context.Context, name string, keypath backend.KeyPath, _ bool) error { +func (rw *Backend) Delete(ctx context.Context, name string, keypath backend.KeyPath, _ bool) error { + if err := ctx.Err(); err != nil { + return err + } + path := rw.rootPath(append(keypath, name)) return os.RemoveAll(path) } // List implements backend.Reader -func (rw *Backend) List(_ context.Context, keypath backend.KeyPath) ([]string, error) { +func (rw *Backend) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + path := rw.rootPath(keypath) folders, err := os.ReadDir(path) if err != nil { @@ -129,7 +149,11 @@ func (rw *Backend) List(_ context.Context, keypath backend.KeyPath) ([]string, e } // Read implements backend.Reader -func (rw *Backend) Read(_ context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { +func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) { + if err := ctx.Err(); err != nil { + return nil, -1, err + } + filename := rw.objectFileName(keypath, name) f, err := os.OpenFile(filename, os.O_RDONLY, 0o644) @@ -148,6 +172,10 @@ func (rw *Backend) Read(_ context.Context, name string, keypath backend.KeyPath, // ReadRange implements backend.Reader func (rw *Backend) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte, _ bool) error { + if err := ctx.Err(); err != nil { + return err + } + span, _ := opentracing.StartSpanFromContext(ctx, "local.ReadRange", opentracing.Tags{ "len": len(buffer), "offset": offset, diff --git a/tempodb/encoding/vparquet2/readers.go b/tempodb/encoding/vparquet2/readers.go index 645d7d73227..4090c31ba5d 100644 --- a/tempodb/encoding/vparquet2/readers.go +++ b/tempodb/encoding/vparquet2/readers.go @@ -135,18 +135,22 @@ func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) { // walReaderAt is wrapper over io.ReaderAt, and is used to measure the total bytes read when searching walBlock. type walReaderAt struct { - r io.ReaderAt - + ctx context.Context + r io.ReaderAt bytesRead atomic.Uint64 } var _ io.ReaderAt = (*walReaderAt)(nil) -func newWalReaderAt(r io.ReaderAt) *walReaderAt { - return &walReaderAt{r, atomic.Uint64{}} +func newWalReaderAt(ctx context.Context, r io.ReaderAt) *walReaderAt { + return &walReaderAt{ctx, r, atomic.Uint64{}} } func (wr *walReaderAt) ReadAt(p []byte, off int64) (int, error) { + if err := wr.ctx.Err(); err != nil { + return 0, err + } + // parquet-go will call ReadAt when reading data from a parquet file. n, err := wr.r.ReadAt(p, off) // ReadAt can read less than len(p) bytes in some cases diff --git a/tempodb/encoding/vparquet2/wal_block.go b/tempodb/encoding/vparquet2/wal_block.go index 0e096b7efdf..5d535f805e5 100644 --- a/tempodb/encoding/vparquet2/wal_block.go +++ b/tempodb/encoding/vparquet2/wal_block.go @@ -112,7 +112,7 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo path := filepath.Join(dir, f.Name()) page := newWalBlockFlush(path, common.NewIDMap[int64]()) - file, err := page.file() + file, err := page.file(context.Background()) if err != nil { warning = fmt.Errorf("error opening file info: %s: %w", page.path, err) continue @@ -208,8 +208,13 @@ func newWalBlockFlush(path string, ids *common.IDMap[int64]) *walBlockFlush { } // file() opens the parquet file and returns it. previously this method cached the file on first open -// but the memory cost of this was quite high. so instead we open it fresh every time -func (w *walBlockFlush) file() (*pageFile, error) { +// but the memory cost of this was quite high. so instead we open it fresh every time. This +// also allows it to take the context for the caller. +func (w *walBlockFlush) file(ctx context.Context) (*pageFile, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + file, err := os.OpenFile(w.path, os.O_RDONLY, 0o644) if err != nil { return nil, fmt.Errorf("error opening file: %w", err) @@ -220,7 +225,7 @@ func (w *walBlockFlush) file() (*pageFile, error) { } size := info.Size() - wr := newWalReaderAt(file) + wr := newWalReaderAt(ctx, file) pf, err := parquet.OpenFile(wr, size, parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileSchema(walSchema)) if err != nil { return nil, fmt.Errorf("error opening parquet file: %w", err) @@ -232,7 +237,7 @@ func (w *walBlockFlush) file() (*pageFile, error) { } func (w *walBlockFlush) rowIterator() (*rowIterator, error) { - file, err := w.file() + file, err := w.file(context.Background()) if err != nil { return nil, err } @@ -492,12 +497,12 @@ func (b *walBlock) Clear() error { return errs.Err() } -func (b *walBlock) FindTraceByID(_ context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { +func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { trs := make([]*tempopb.Trace, 0) for _, page := range b.flushed { if rowNumber, ok := page.ids.Get(id); ok { - file, err := page.file() + file, err := page.file(ctx) if err != nil { return nil, fmt.Errorf("error opening file %s: %w", page.path, err) } @@ -543,7 +548,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com } for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return nil, fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -569,7 +574,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com func (b *walBlock) SearchTags(ctx context.Context, scope traceql.AttributeScope, cb common.TagCallback, _ common.SearchOptions) error { for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -603,7 +608,7 @@ func (b *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.Ta func (b *walBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagCallbackV2, _ common.SearchOptions) error { for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -632,7 +637,7 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt readers := make([]*walReaderAt, 0, len(blockFlushes)) iters := make([]traceql.SpansetIterator, 0, len(blockFlushes)) for _, page := range blockFlushes { - file, err := page.file() + file, err := page.file(ctx) if err != nil { return traceql.FetchSpansResponse{}, fmt.Errorf("error opening file %s: %w", page.path, err) } diff --git a/tempodb/encoding/vparquet3/readers.go b/tempodb/encoding/vparquet3/readers.go index be32e3f528c..5f6b93bd150 100644 --- a/tempodb/encoding/vparquet3/readers.go +++ b/tempodb/encoding/vparquet3/readers.go @@ -135,18 +135,22 @@ func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) { // walReaderAt is wrapper over io.ReaderAt, and is used to measure the total bytes read when searching walBlock. type walReaderAt struct { - r io.ReaderAt - + ctx context.Context + r io.ReaderAt bytesRead atomic.Uint64 } var _ io.ReaderAt = (*walReaderAt)(nil) -func newWalReaderAt(r io.ReaderAt) *walReaderAt { - return &walReaderAt{r, atomic.Uint64{}} +func newWalReaderAt(ctx context.Context, r io.ReaderAt) *walReaderAt { + return &walReaderAt{ctx, r, atomic.Uint64{}} } func (wr *walReaderAt) ReadAt(p []byte, off int64) (int, error) { + if err := wr.ctx.Err(); err != nil { + return 0, err + } + // parquet-go will call ReadAt when reading data from a parquet file. n, err := wr.r.ReadAt(p, off) // ReadAt can read less than len(p) bytes in some cases diff --git a/tempodb/encoding/vparquet3/wal_block.go b/tempodb/encoding/vparquet3/wal_block.go index 24acc34b8e1..f7c628c699d 100644 --- a/tempodb/encoding/vparquet3/wal_block.go +++ b/tempodb/encoding/vparquet3/wal_block.go @@ -112,7 +112,7 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo path := filepath.Join(dir, f.Name()) page := newWalBlockFlush(path, common.NewIDMap[int64]()) - file, err := page.file() + file, err := page.file(context.Background()) if err != nil { warning = fmt.Errorf("error opening file info: %s: %w", page.path, err) continue @@ -209,8 +209,13 @@ func newWalBlockFlush(path string, ids *common.IDMap[int64]) *walBlockFlush { } // file() opens the parquet file and returns it. previously this method cached the file on first open -// but the memory cost of this was quite high. so instead we open it fresh every time -func (w *walBlockFlush) file() (*pageFile, error) { +// but the memory cost of this was quite high. so instead we open it fresh every time. This +// also allows it to take the context for the caller. +func (w *walBlockFlush) file(ctx context.Context) (*pageFile, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + file, err := os.OpenFile(w.path, os.O_RDONLY, 0o644) if err != nil { return nil, fmt.Errorf("error opening file: %w", err) @@ -221,7 +226,7 @@ func (w *walBlockFlush) file() (*pageFile, error) { } size := info.Size() - wr := newWalReaderAt(file) + wr := newWalReaderAt(ctx, file) pf, err := parquet.OpenFile(wr, size, parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileSchema(walSchema)) if err != nil { return nil, fmt.Errorf("error opening parquet file: %w", err) @@ -233,7 +238,7 @@ func (w *walBlockFlush) file() (*pageFile, error) { } func (w *walBlockFlush) rowIterator() (*rowIterator, error) { - file, err := w.file() + file, err := w.file(context.Background()) if err != nil { return nil, err } @@ -500,12 +505,12 @@ func (b *walBlock) Clear() error { return errs.Err() } -func (b *walBlock) FindTraceByID(_ context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { +func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) { trs := make([]*tempopb.Trace, 0) for _, page := range b.flushed { if rowNumber, ok := page.ids.Get(id); ok { - file, err := page.file() + file, err := page.file(ctx) if err != nil { return nil, fmt.Errorf("error opening file %s: %w", page.path, err) } @@ -551,7 +556,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com } for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return nil, fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -577,7 +582,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com func (b *walBlock) SearchTags(ctx context.Context, scope traceql.AttributeScope, cb common.TagCallback, _ common.SearchOptions) error { for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -611,7 +616,7 @@ func (b *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.Ta func (b *walBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagCallbackV2, _ common.SearchOptions) error { for i, blockFlush := range b.readFlushes() { - file, err := blockFlush.file() + file, err := blockFlush.file(ctx) if err != nil { return fmt.Errorf("error opening file %s: %w", blockFlush.path, err) } @@ -640,7 +645,7 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt readers := make([]*walReaderAt, 0, len(blockFlushes)) iters := make([]traceql.SpansetIterator, 0, len(blockFlushes)) for _, page := range blockFlushes { - file, err := page.file() + file, err := page.file(ctx) if err != nil { return traceql.FetchSpansResponse{}, fmt.Errorf("error opening file %s: %w", page.path, err) }