Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester Search honor ctx errors #3031

Merged
merged 8 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ defaults:
```
* [BUGFIX] Moved empty root span substitution from `querier` to `query-frontend`. [#2671](https://github.com/grafana/tempo/issues/2671) (@galalen)
* [BUGFIX] Correctly propagate ingester errors on the query path [#2935](https://github.com/grafana/tempo/issues/2935) (@joe-elliott)
* [BUGFIX] Fix issue where ingester doesn't stop query after timeout [#3031](https://github.com/grafana/tempo/pull/3031) (@mdisibio)
* [BUGFIX] Fix cases where empty filter {} wouldn't return expected results [#2498](https://github.com/grafana/tempo/issues/2498) (@mdisibio)

# v2.2.3 / 2023-09-13
Expand Down
1 change: 1 addition & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func defaultIngesterTestConfig() Config {
nil,
)

cfg.FlushOpTimeout = 99999 * time.Hour
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxTraceIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
15 changes: 7 additions & 8 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,14 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
// collect results from all the goroutines via sr.Results channel.
// range loop will exit when sr.Results channel is closed.
for result := range sr.Results() {
// exit early and Propagate error upstream
if sr.Error() != nil {
return nil, sr.Error()
}

combiner.AddMetadata(result)
if combiner.Count() >= maxResults {
sr.Close() // signal pending workers to exit
break
continue
}

combiner.AddMetadata(result)
}

// can happen when we have only error, and no results
if sr.Error() != nil {
return nil, sr.Error()
}
Expand Down Expand Up @@ -141,6 +136,10 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
resp, err = e.Search(ctx, req, opts)
}

if err != nil {
fmt.Println("!!!!!! searchBlock bailing", blockID, err)
}

if errors.Is(err, common.ErrUnsupported) {
level.Warn(log.Logger).Log("msg", "block does not support search", "blockID", blockID)
return
Expand Down
1 change: 1 addition & 0 deletions pkg/search/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (sr *Results) AddResult(ctx context.Context, r *tempopb.TraceSearchMetadata
func (sr *Results) SetError(err error) {
if !errors.Is(err, common.ErrUnsupported) { // ignore common.Unsupported
sr.error.Store(err)
sr.Close()
}
}

Expand Down
28 changes: 16 additions & 12 deletions pkg/search/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ func TestResultsDoesNotRace(t *testing.T) {
sr := NewResults()
defer sr.Close()

for i := 0; i < 100; i++ {
workers := 10
results := 10_000

for i := 0; i < workers; i++ {
sr.StartWorker()
go func() {
defer sr.FinishWorker()
for j := 0; j < 10_000; j++ {
for j := 0; j < results; j++ {
if sr.AddResult(ctx, &tempopb.TraceSearchMetadata{}) {
break
}
Expand All @@ -46,28 +49,29 @@ func TestResultsDoesNotRace(t *testing.T) {

sr.AllWorkersStarted()

var resultsCount int
var err error
resultsCount := 0
for range sr.Results() {
if sr.Error() != nil {
err = sr.Error() // capture err to assert below
break // exit early
}
resultsCount++
}

// Check error after results channel is closed which
// means all workers have exited.
err := sr.Error()

if tc.error {
require.Error(t, err)
if tc.consumeResults {
// in case of error, we will bail out early
require.NotEqual(t, 10_000_00, resultsCount)
// will read at-least something by the time we have first error
require.NotEqual(t, 0, resultsCount)
// and not all results are read
require.Less(t, resultsCount, workers*results)
// But we always get at least 1 result before
// the error
require.Greater(t, resultsCount, 0)
}
} else {
require.NoError(t, err)
if tc.consumeResults {
require.Equal(t, 10_000_00, resultsCount)
require.Equal(t, workers*results, resultsCount)
}
}
})
Expand Down
38 changes: 33 additions & 5 deletions tempodb/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func New(cfg *Config) (backend.RawReader, backend.RawWriter, backend.Compactor,
}

// Write implements backend.Writer
func (rw *Backend) Write(_ context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
func (rw *Backend) Write(ctx context.Context, name string, keypath backend.KeyPath, data io.Reader, _ int64, _ bool) error {
if err := ctx.Err(); err != nil {
return err
}

blockFolder := rw.rootPath(keypath)
err := os.MkdirAll(blockFolder, os.ModePerm)
if err != nil {
Expand All @@ -64,6 +68,10 @@ func (rw *Backend) Write(_ context.Context, name string, keypath backend.KeyPath

// Append implements backend.Writer
func (rw *Backend) Append(ctx context.Context, name string, keypath backend.KeyPath, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

span, _ := opentracing.StartSpanFromContext(ctx, "local.Append", opentracing.Tags{
"len": len(buffer),
})
Expand Down Expand Up @@ -95,7 +103,11 @@ func (rw *Backend) Append(ctx context.Context, name string, keypath backend.KeyP
}

// CloseAppend implements backend.Writer
func (rw *Backend) CloseAppend(_ context.Context, tracker backend.AppendTracker) error {
func (rw *Backend) CloseAppend(ctx context.Context, tracker backend.AppendTracker) error {
if err := ctx.Err(); err != nil {
return err
}

if tracker == nil {
return nil
}
Expand All @@ -104,13 +116,21 @@ func (rw *Backend) CloseAppend(_ context.Context, tracker backend.AppendTracker)
return dst.Close()
}

func (rw *Backend) Delete(_ context.Context, name string, keypath backend.KeyPath, _ bool) error {
func (rw *Backend) Delete(ctx context.Context, name string, keypath backend.KeyPath, _ bool) error {
if err := ctx.Err(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zalegrala heads up you will want this context logic in the Blocks() method you added to the local backend

return err
}

path := rw.rootPath(append(keypath, name))
return os.RemoveAll(path)
}

// List implements backend.Reader
func (rw *Backend) List(_ context.Context, keypath backend.KeyPath) ([]string, error) {
func (rw *Backend) List(ctx context.Context, keypath backend.KeyPath) ([]string, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

path := rw.rootPath(keypath)
folders, err := os.ReadDir(path)
if err != nil {
Expand All @@ -129,7 +149,11 @@ func (rw *Backend) List(_ context.Context, keypath backend.KeyPath) ([]string, e
}

// Read implements backend.Reader
func (rw *Backend) Read(_ context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ bool) (io.ReadCloser, int64, error) {
if err := ctx.Err(); err != nil {
return nil, -1, err
}

filename := rw.objectFileName(keypath, name)

f, err := os.OpenFile(filename, os.O_RDONLY, 0o644)
Expand All @@ -148,6 +172,10 @@ func (rw *Backend) Read(_ context.Context, name string, keypath backend.KeyPath,

// ReadRange implements backend.Reader
func (rw *Backend) ReadRange(ctx context.Context, name string, keypath backend.KeyPath, offset uint64, buffer []byte, _ bool) error {
if err := ctx.Err(); err != nil {
return err
}

span, _ := opentracing.StartSpanFromContext(ctx, "local.ReadRange", opentracing.Tags{
"len": len(buffer),
"offset": offset,
Expand Down
12 changes: 8 additions & 4 deletions tempodb/encoding/vparquet2/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,22 @@ func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) {

// walReaderAt is wrapper over io.ReaderAt, and is used to measure the total bytes read when searching walBlock.
type walReaderAt struct {
r io.ReaderAt

ctx context.Context
r io.ReaderAt
bytesRead atomic.Uint64
}

var _ io.ReaderAt = (*walReaderAt)(nil)

func newWalReaderAt(r io.ReaderAt) *walReaderAt {
return &walReaderAt{r, atomic.Uint64{}}
func newWalReaderAt(ctx context.Context, r io.ReaderAt) *walReaderAt {
return &walReaderAt{ctx, r, atomic.Uint64{}}
}

func (wr *walReaderAt) ReadAt(p []byte, off int64) (int, error) {
if err := wr.ctx.Err(); err != nil {
return 0, err
}

// parquet-go will call ReadAt when reading data from a parquet file.
n, err := wr.r.ReadAt(p, off)
// ReadAt can read less than len(p) bytes in some cases
Expand Down
27 changes: 16 additions & 11 deletions tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo
path := filepath.Join(dir, f.Name())
page := newWalBlockFlush(path, common.NewIDMap[int64]())

file, err := page.file()
file, err := page.file(context.Background())
if err != nil {
warning = fmt.Errorf("error opening file info: %s: %w", page.path, err)
continue
Expand Down Expand Up @@ -208,8 +208,13 @@ func newWalBlockFlush(path string, ids *common.IDMap[int64]) *walBlockFlush {
}

// file() opens the parquet file and returns it. previously this method cached the file on first open
// but the memory cost of this was quite high. so instead we open it fresh every time
func (w *walBlockFlush) file() (*pageFile, error) {
// but the memory cost of this was quite high. so instead we open it fresh every time. This
// also allows it to take the context for the caller.
func (w *walBlockFlush) file(ctx context.Context) (*pageFile, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

file, err := os.OpenFile(w.path, os.O_RDONLY, 0o644)
if err != nil {
return nil, fmt.Errorf("error opening file: %w", err)
Expand All @@ -220,7 +225,7 @@ func (w *walBlockFlush) file() (*pageFile, error) {
}
size := info.Size()

wr := newWalReaderAt(file)
wr := newWalReaderAt(ctx, file)
pf, err := parquet.OpenFile(wr, size, parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileSchema(walSchema))
if err != nil {
return nil, fmt.Errorf("error opening parquet file: %w", err)
Expand All @@ -232,7 +237,7 @@ func (w *walBlockFlush) file() (*pageFile, error) {
}

func (w *walBlockFlush) rowIterator() (*rowIterator, error) {
file, err := w.file()
file, err := w.file(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -492,12 +497,12 @@ func (b *walBlock) Clear() error {
return errs.Err()
}

func (b *walBlock) FindTraceByID(_ context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
trs := make([]*tempopb.Trace, 0)

for _, page := range b.flushed {
if rowNumber, ok := page.ids.Get(id); ok {
file, err := page.file()
file, err := page.file(ctx)
if err != nil {
return nil, fmt.Errorf("error opening file %s: %w", page.path, err)
}
Expand Down Expand Up @@ -543,7 +548,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com
}

for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
file, err := blockFlush.file(ctx)
if err != nil {
return nil, fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}
Expand All @@ -569,7 +574,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, _ com

func (b *walBlock) SearchTags(ctx context.Context, scope traceql.AttributeScope, cb common.TagCallback, _ common.SearchOptions) error {
for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
file, err := blockFlush.file(ctx)
if err != nil {
return fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}
Expand Down Expand Up @@ -603,7 +608,7 @@ func (b *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.Ta

func (b *walBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagCallbackV2, _ common.SearchOptions) error {
for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
file, err := blockFlush.file(ctx)
if err != nil {
return fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}
Expand Down Expand Up @@ -632,7 +637,7 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt
readers := make([]*walReaderAt, 0, len(blockFlushes))
iters := make([]traceql.SpansetIterator, 0, len(blockFlushes))
for _, page := range blockFlushes {
file, err := page.file()
file, err := page.file(ctx)
if err != nil {
return traceql.FetchSpansResponse{}, fmt.Errorf("error opening file %s: %w", page.path, err)
}
Expand Down
12 changes: 8 additions & 4 deletions tempodb/encoding/vparquet3/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,22 @@ func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) {

// walReaderAt is wrapper over io.ReaderAt, and is used to measure the total bytes read when searching walBlock.
type walReaderAt struct {
r io.ReaderAt

ctx context.Context
r io.ReaderAt
bytesRead atomic.Uint64
}

var _ io.ReaderAt = (*walReaderAt)(nil)

func newWalReaderAt(r io.ReaderAt) *walReaderAt {
return &walReaderAt{r, atomic.Uint64{}}
func newWalReaderAt(ctx context.Context, r io.ReaderAt) *walReaderAt {
return &walReaderAt{ctx, r, atomic.Uint64{}}
}

func (wr *walReaderAt) ReadAt(p []byte, off int64) (int, error) {
if err := wr.ctx.Err(); err != nil {
return 0, err
}

// parquet-go will call ReadAt when reading data from a parquet file.
n, err := wr.r.ReadAt(p, off)
// ReadAt can read less than len(p) bytes in some cases
Expand Down
Loading