From 8f09e3684a7754818692b18a7f86f51c46f06214 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:16:40 -0700 Subject: [PATCH] fix: return and respect cursor errors (#24791) (#24846) (#24847) ArrayCursors were ignoring errors, which led to panics when nil cursors were operated on. This fix passes errors back up the stack and uses them to enforce healthy cursor creation. Closes https://github.com/influxdata/influxdb/issues/24789 --------- Co-authored-by: Stuart Carnie (cherry picked from commit fe6c64b21ed7e0757375e57b8eca21e9c05f3c89) closes https://github.com/influxdata/influxdb/issues/24836 (cherry picked from commit 49d0bef3ea6ea5e2836844cd059159bd246cc5bb) closes https://github.com/influxdata/influxdb/issues/24826 --- storage/reads/array_cursor.gen.go | 40 +++++---- storage/reads/array_cursor.gen.go.tmpl | 10 ++- storage/reads/array_cursor.go | 5 +- tsdb/engine/tsm1/array_cursor.gen.go | 90 ++++++++++++++----- tsdb/engine/tsm1/array_cursor.gen.go.tmpl | 20 +++-- tsdb/engine/tsm1/array_cursor_iterator.gen.go | 85 ++++++++++++------ .../tsm1/array_cursor_iterator.gen.go.tmpl | 17 ++-- tsdb/engine/tsm1/array_cursor_iterator.go | 10 +-- tsdb/engine/tsm1/array_cursor_test.go | 61 ++++++++++--- 9 files changed, 245 insertions(+), 93 deletions(-) diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 63b45cda913..50bfe8d3ab5 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -315,13 +315,15 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.FloatArrayCursor next, ok = cur.(cursors.FloatArrayCursor) if !ok { @@ -1196,13 +1198,15 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.IntegerArrayCursor next, ok = cur.(cursors.IntegerArrayCursor) if !ok { @@ -2077,13 +2081,15 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.UnsignedArrayCursor next, ok = cur.(cursors.UnsignedArrayCursor) if !ok { @@ -2958,13 +2964,15 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.StringArrayCursor next, ok = cur.(cursors.StringArrayCursor) if !ok { @@ -3384,13 +3392,15 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) + cur, err = itr.Next(c.ctx, c.req) } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.BooleanArrayCursor next, ok = cur.(cursors.BooleanArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index e41ab7e9748..57cee9c6cb3 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -263,13 +263,15 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool { var itr cursors.CursorIterator var cur cursors.Cursor - for cur == nil && len(c.itrs) > 0 { + var err error + for cur == nil && len(c.itrs) > 0 && err == nil { itr, c.itrs = c.itrs[0], c.itrs[1:] - cur, _ = itr.Next(c.ctx, c.req) - } + cur, err = itr.Next(c.ctx, c.req) + } + c.err = err var ok bool - if cur != nil { + if cur != nil && err == nil { var next cursors.{{.Name}}ArrayCursor next, ok = cur.(cursors.{{.Name}}ArrayCursor) if !ok { diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index 5ae1f1eb8de..215aab54dba 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -113,12 +113,13 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { var shard cursors.CursorIterator var cur cursors.Cursor + var err error for cur == nil && len(row.Query) > 0 { shard, row.Query = row.Query[0], row.Query[1:] - cur, _ = shard.Next(m.ctx, &m.req) + cur, err = shard.Next(m.ctx, &m.req) } - if cur == nil { + if cur == nil || err != nil { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go b/tsdb/engine/tsm1/array_cursor.gen.go index 7b786d31626..8f10b41bb6f 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go +++ b/tsdb/engine/tsm1/array_cursor.gen.go @@ -39,7 +39,8 @@ func newFloatArrayAscendingCursor() *floatArrayAscendingCursor { return c } -func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -47,10 +48,14 @@ func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, t }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *floatArrayAscendingCursor) Err() error { return nil } @@ -182,7 +187,8 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor { return c } -func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -194,11 +200,15 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *floatArrayDescendingCursor) Err() error { return nil } @@ -321,7 +331,8 @@ func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor { return c } -func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -329,10 +340,14 @@ func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *integerArrayAscendingCursor) Err() error { return nil } @@ -464,7 +479,8 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor { return c } -func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -476,11 +492,15 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *integerArrayDescendingCursor) Err() error { return nil } @@ -603,7 +623,8 @@ func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor { return c } -func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -611,10 +632,14 @@ func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *unsignedArrayAscendingCursor) Err() error { return nil } @@ -746,7 +771,8 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor { return c } -func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -758,11 +784,15 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *unsignedArrayDescendingCursor) Err() error { return nil } @@ -885,7 +915,8 @@ func newStringArrayAscendingCursor() *stringArrayAscendingCursor { return c } -func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -893,10 +924,14 @@ func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *stringArrayAscendingCursor) Err() error { return nil } @@ -1028,7 +1063,8 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor { return c } -func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1040,11 +1076,15 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *stringArrayDescendingCursor) Err() error { return nil } @@ -1167,7 +1207,8 @@ func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor { return c } -func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { @@ -1175,10 +1216,14 @@ func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *booleanArrayAscendingCursor) Err() error { return nil } @@ -1310,7 +1355,8 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor { return c } -func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -1322,11 +1368,15 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *booleanArrayDescendingCursor) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl index 02a209eeea1..ffab04922ed 100644 --- a/tsdb/engine/tsm1/array_cursor.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor.gen.go.tmpl @@ -38,18 +38,23 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { -c.end = end +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error + c.end = end c.cache.values = cacheValues c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool { return c.cache.values[i].UnixNano() >= seek }) c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] >= seek }) + return nil } func (c *{{$type}}) Err() error { return nil } @@ -184,7 +189,8 @@ func new{{$Type}}() *{{$type}} { return c } -func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) { +func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error { + var err error // Search for the time value greater than the seek time (not included) // and then move our position back one which will include the values in // our time range. @@ -196,11 +202,15 @@ func (c *{{$type}}) reset(seek, end int64, cacheValues Values, tsmKeyCursor *Key c.cache.pos-- c.tsm.keyCursor = tsmKeyCursor - c.tsm.values, _ = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + c.tsm.values, err = c.tsm.keyCursor.Read{{.Name}}ArrayBlock(c.tsm.buf) + if err != nil { + return err + } c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool { return c.tsm.values.Timestamps[i] > seek }) c.tsm.pos-- + return nil } func (c *{{$type}}) Err() error { return nil } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go b/tsdb/engine/tsm1/array_cursor_iterator.gen.go index 7cddd84f864..d0e27d0d9ae 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go @@ -15,7 +15,8 @@ import ( ) // buildFloatArrayCursor creates an array cursor for a float field. -func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.FloatArrayCursor { +func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.FloatArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -23,19 +24,26 @@ func (q *arrayCursorIterator) buildFloatArrayCursor(ctx context.Context, name [] if q.asc.Float == nil { q.asc.Float = newFloatArrayAscendingCursor() } - q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Float + err = q.asc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Float, nil } else { if q.desc.Float == nil { q.desc.Float = newFloatArrayDescendingCursor() } - q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Float + err = q.desc.Float.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Float, nil } } // buildIntegerArrayCursor creates an array cursor for a integer field. -func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.IntegerArrayCursor { +func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.IntegerArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -43,19 +51,26 @@ func (q *arrayCursorIterator) buildIntegerArrayCursor(ctx context.Context, name if q.asc.Integer == nil { q.asc.Integer = newIntegerArrayAscendingCursor() } - q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Integer + err = q.asc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Integer, nil } else { if q.desc.Integer == nil { q.desc.Integer = newIntegerArrayDescendingCursor() } - q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Integer + err = q.desc.Integer.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Integer, nil } } // buildUnsignedArrayCursor creates an array cursor for a unsigned field. -func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.UnsignedArrayCursor { +func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.UnsignedArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -63,19 +78,26 @@ func (q *arrayCursorIterator) buildUnsignedArrayCursor(ctx context.Context, name if q.asc.Unsigned == nil { q.asc.Unsigned = newUnsignedArrayAscendingCursor() } - q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Unsigned + err = q.asc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Unsigned, nil } else { if q.desc.Unsigned == nil { q.desc.Unsigned = newUnsignedArrayDescendingCursor() } - q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Unsigned + err = q.desc.Unsigned.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Unsigned, nil } } // buildStringArrayCursor creates an array cursor for a string field. -func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.StringArrayCursor { +func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.StringArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -83,19 +105,26 @@ func (q *arrayCursorIterator) buildStringArrayCursor(ctx context.Context, name [ if q.asc.String == nil { q.asc.String = newStringArrayAscendingCursor() } - q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.String + err = q.asc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.String, nil } else { if q.desc.String == nil { q.desc.String = newStringArrayDescendingCursor() } - q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.String + err = q.desc.String.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.String, nil } } // buildBooleanArrayCursor creates an array cursor for a boolean field. -func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.BooleanArrayCursor { +func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.BooleanArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -103,13 +132,19 @@ func (q *arrayCursorIterator) buildBooleanArrayCursor(ctx context.Context, name if q.asc.Boolean == nil { q.asc.Boolean = newBooleanArrayAscendingCursor() } - q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.Boolean + err = q.asc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.Boolean, nil } else { if q.desc.Boolean == nil { q.desc.Boolean = newBooleanArrayDescendingCursor() } - q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.Boolean + err = q.desc.Boolean.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.Boolean, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl index 84cf6a35d69..108b08eb74c 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl +++ b/tsdb/engine/tsm1/array_cursor_iterator.gen.go.tmpl @@ -11,7 +11,8 @@ import ( {{range .}} // build{{.Name}}ArrayCursor creates an array cursor for a {{.name}} field. -func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) tsdb.{{.Name}}ArrayCursor { +func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, name []byte, tags models.Tags, field string, opt query.IteratorOptions) (tsdb.{{.Name}}ArrayCursor, error) { + var err error key := q.seriesFieldKeyBytes(name, tags, field) cacheValues := q.e.Cache.Values(key) keyCursor := q.e.KeyCursor(ctx, key, opt.SeekTime(), opt.Ascending) @@ -19,14 +20,20 @@ func (q *arrayCursorIterator) build{{.Name}}ArrayCursor(ctx context.Context, nam if q.asc.{{.Name}} == nil { q.asc.{{.Name}} = new{{.Name}}ArrayAscendingCursor() } - q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.asc.{{.Name}} + err = q.asc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.asc.{{.Name}}, nil } else { if q.desc.{{.Name}} == nil { q.desc.{{.Name}} = new{{.Name}}ArrayDescendingCursor() } - q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) - return q.desc.{{.Name}} + err = q.desc.{{.Name}}.reset(opt.SeekTime(), opt.StopTime(), cacheValues, keyCursor) + if err != nil { + return nil, err + } + return q.desc.{{.Name}}, nil } } diff --git a/tsdb/engine/tsm1/array_cursor_iterator.go b/tsdb/engine/tsm1/array_cursor_iterator.go index 3a4d130847e..4b7e860c276 100644 --- a/tsdb/engine/tsm1/array_cursor_iterator.go +++ b/tsdb/engine/tsm1/array_cursor_iterator.go @@ -62,15 +62,15 @@ func (q *arrayCursorIterator) Next(ctx context.Context, r *tsdb.CursorRequest) ( // Return appropriate cursor based on type. switch f.Type { case influxql.Float: - return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildFloatArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Integer: - return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildIntegerArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Unsigned: - return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildUnsignedArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.String: - return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildStringArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) case influxql.Boolean: - return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt), nil + return q.buildBooleanArrayCursor(ctx, r.Name, r.Tags, r.Field, opt) default: panic(fmt.Sprintf("unreachable: %T", f.Type)) } diff --git a/tsdb/engine/tsm1/array_cursor_test.go b/tsdb/engine/tsm1/array_cursor_test.go index abb27dad8a1..67924af0c96 100644 --- a/tsdb/engine/tsm1/array_cursor_test.go +++ b/tsdb/engine/tsm1/array_cursor_test.go @@ -28,6 +28,14 @@ func MustTempFile(dir string) *os.File { return f } +func MustTempDir() string { + dir, err := os.MkdirTemp("", "tsm1-test") + if err != nil { + panic(fmt.Sprintf("failed to create temp dir: %v", err)) + } + return dir +} + func newFiles(dir string, values ...keyValues) ([]string, error) { var files []string @@ -62,6 +70,35 @@ func newFiles(dir string, values ...keyValues) ([]string, error) { return files, nil } +func TestCursor_ResetFail(t *testing.T) { + t.Run("bad block", func(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + fs := NewFileStore(dir, tsdb.EngineTags{}) + + const START, END = 10, 1 + + data := []keyValues{ + // Write a single data point with timestamp equal to END + {"m,_field=v#!~#v", []Value{NewIntegerValue(1, 1)}}, + } + + files, err := newFiles(dir, data...) + if err != nil { + t.Fatalf("unexpected error creating files: %v", err) + } + + _ = fs.Replace(nil, files) + + kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) + defer kc.Close() + // Open a float cursor for an integer block + cur := newFloatArrayDescendingCursor() + err = cur.reset(START, END, nil, kc) + assert.ErrorContains(t, err, "invalid block", "expected invalid block") + }) +} + func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Run("cache", func(t *testing.T) { dir := t.TempDir() @@ -74,7 +111,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { cur := newIntegerArrayDescendingCursor() t.Cleanup(cur.Close) // Include a cached value with timestamp equal to END - cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc) + assert.NoError(t, cur.reset(START, END, Values{NewIntegerValue(1, 1)}, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -110,7 +147,7 @@ func TestDescendingCursor_SinglePointStartTime(t *testing.T) { t.Cleanup(kc.Close) cur := newIntegerArrayDescendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") var got []int64 ar := cur.Next() @@ -158,7 +195,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, true) t.Cleanup(kc.Close) cur := newFloatArrayAscendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") t.Cleanup(cur.Close) var got []int64 @@ -178,7 +215,7 @@ func TestFileStore_DuplicatePoints(t *testing.T) { kc := fs.KeyCursor(context.Background(), []byte("m,_field=v#!~#v"), START, false) t.Cleanup(kc.Close) cur := newFloatArrayDescendingCursor() - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") t.Cleanup(cur.Close) var got []int64 @@ -255,7 +292,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. t.Cleanup(kc.Close) cur := newFloatArrayAscendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -279,7 +316,7 @@ func TestFileStore_MergeBlocksLargerThat1000_SecondEntirelyContained(t *testing. t.Cleanup(kc.Close) cur := newFloatArrayDescendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeTs(1000, 800, 10) exp = append(exp, makeTs(1005, 400, 10)...) @@ -360,7 +397,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing t.Cleanup(kc.Close) cur := newFloatArrayAscendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -388,7 +425,7 @@ func TestFileStore_MergeBlocksLargerThat1000_MultipleBlocksInEachFile(t *testing t.Cleanup(kc.Close) cur := newFloatArrayDescendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 3500, 10, 1.01) a2 := makeArray(4005, 3500, 5, 2.01) @@ -456,7 +493,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { t.Cleanup(kc.Close) cur := newFloatArrayAscendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) @@ -482,7 +519,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { t.Cleanup(kc.Close) cur := newFloatArrayAscendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01) @@ -510,7 +547,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { t.Cleanup(kc.Close) cur := newFloatArrayDescendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1000, 100, 1, 1.01) sort.Sort(sort.Reverse(&FloatArray{exp})) @@ -537,7 +574,7 @@ func TestFileStore_SeekBoundaries(t *testing.T) { t.Cleanup(kc.Close) cur := newFloatArrayDescendingCursor() t.Cleanup(cur.Close) - cur.reset(START, END, nil, kc) + assert.NoError(t, cur.reset(START, END, nil, kc), "unexpected error resetting cursor") exp := makeArray(1050, 50, 1, 1.01) a2 := makeArray(1100, 50, 1, 2.01)