Skip to content

Commit

Permalink
fix: panic index out of range for invalid series keys [Port to 2.7] (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty authored Jan 23, 2024
1 parent 09a9607 commit bfb09aa
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 8 deletions.
23 changes: 23 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
}

name, tags := ParseSeriesKey(key)
if len(name) == 0 {
continue
}
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
Expand Down Expand Up @@ -390,6 +393,9 @@ func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {

// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}
key := string(models.MakeKey(name, tags))

// Write auxiliary fields.
Expand Down Expand Up @@ -886,6 +892,9 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
}

name, tags := ParseSeriesKey(itr.keys[0])
if len(name) == 0 {
continue
}
itr.keys = itr.keys[1:]

// TODO(edd): It seems to me like this authorisation check should be
Expand Down Expand Up @@ -2327,6 +2336,9 @@ func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
}

name, tags := ParseSeriesKey(seriesKey)
if len(name) == 0 {
continue
}

// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
Expand Down Expand Up @@ -2428,6 +2440,11 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}

name, tags := ParseSeriesKey(seriesKey)
// An invalid series key of 0 length should have been caught by the
// above check, but for extra safety we can pass over it here too.
if len(name) == 0 {
continue
}
keys = append(keys, models.MakeKey(name, tags))
}

Expand Down Expand Up @@ -2904,12 +2921,18 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key

if auth != nil {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
}
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
}
}

_, buf = ReadSeriesKeyLen(buf)
if len(buf) == 0 {
continue
}
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {
Expand Down
3 changes: 3 additions & 0 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,9 @@ func (f *LogFile) execSeriesEntry(e *LogEntry) {

// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
if len(remainder) == 0 {
return
}

// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
Expand Down
4 changes: 4 additions & 0 deletions tsdb/series_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsdb
import (
"bytes"
"errors"
"fmt"
"sort"
"sync"

Expand Down Expand Up @@ -112,6 +113,9 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
}

cur.row.Name, cur.row.Tags = ParseSeriesKey(cur.keys[cur.ofs])
if len(cur.row.Name) == 0 {
return nil, fmt.Errorf("series key was not valid: %+v", cur.keys[cur.ofs])
}
cur.ofs++

//if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {
Expand Down
32 changes: 24 additions & 8 deletions tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags)
func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
var name []byte
_, data = ReadSeriesKeyLen(data)
if len(data) == 0 {
return nil, dst
}
name, data = ReadSeriesKeyMeasurement(data)
tagN, data := ReadSeriesKeyTagN(data)

Expand All @@ -436,17 +439,30 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {

func CompareSeriesKeys(a, b []byte) int {
// Handle 'nil' keys.
if len(a) == 0 && len(b) == 0 {
return 0
} else if len(a) == 0 {
return -1
} else if len(b) == 0 {
return 1
// Values below and equal to 1 are considered invalid for key comparisons.
nilKeyHandler := func(a, b int) int {
if a == 0 && b == 0 {
return 0
} else if a == 0 {
return -1
} else if b == 0 {
return 1
}
return a + b
}

keyValidity := nilKeyHandler(len(a), len(b))
if keyValidity <= 1 {
return keyValidity
}

// Read total size.
_, a = ReadSeriesKeyLen(a)
_, b = ReadSeriesKeyLen(b)
szA, a := ReadSeriesKeyLen(a)
szB, b := ReadSeriesKeyLen(b)
keySizeValidity := nilKeyHandler(szA, szB)
if keySizeValidity <= 1 {
return keySizeValidity
}

// Read names.
name0, a := ReadSeriesKeyMeasurement(a)
Expand Down
51 changes: 51 additions & 0 deletions tsdb/series_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,57 @@ func TestParseSeriesKeyInto(t *testing.T) {
}
}

func TestParseSeriesKey(t *testing.T) {
tests := []struct {
name string
seriesKey []byte
expectedKey []byte
}{
{
name: "invalid zero length series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, nil),
expectedKey: nil,
},
{
name: "invalid blank series key",
seriesKey: tsdb.AppendSeriesKey(nil, []byte(""), nil),
expectedKey: nil,
},
{
name: "invalid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte{}, models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: nil,
},
{
name: "valid series key with tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{"tag1": "foo"})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with empty tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(map[string]string{})),
expectedKey: []byte("foo"),
},
{
name: "valid series key with nil tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), models.NewTags(nil)),
expectedKey: []byte("foo"),
},
{
name: "valid series key with no tags",
seriesKey: tsdb.AppendSeriesKey(nil, []byte("foo"), nil),
expectedKey: []byte("foo"),
},
}

for _, tt := range tests {
keyName, _ := tsdb.ParseSeriesKey(tt.seriesKey)
if res := bytes.Compare(keyName, tt.expectedKey); res != 0 {
t.Fatalf("invalid series key parsed for an %s: got %q, expected %q", tt.name, keyName, tt.expectedKey)
}
}
}

// Ensure that broken series files are closed
func TestSeriesFile_Open_WhenFileCorrupt_ShouldReturnErr(t *testing.T) {
f := NewBrokenSeriesFile(t, []byte{0, 0, 0, 0, 0})
Expand Down

0 comments on commit bfb09aa

Please sign in to comment.