Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sorting by time desc #3986

Merged
merged 13 commits into from
Sep 4, 2015
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
56 changes: 56 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,14 @@ func TestServer_Query_Aggregates(t *testing.T) {
command: `SELECT sum(value)/2 FROM load`,
exp: `{"results":[{"series":[{"name":"load","columns":["time",""],"values":[["1970-01-01T00:00:00Z",75]]}]}]}`,
},

// order by time desc
&Query{
name: "order by time desc",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe aggregate order by time desc?

params: url.Values{"db": []string{"db0"}},
command: `SELECT max(value) FROM intmany where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:00Z' group by time(10s) order by time desc`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","max"],"values":[["2000-01-01T00:01:00Z",7],["2000-01-01T00:00:50Z",5],["2000-01-01T00:00:40Z",5],["2000-01-01T00:00:30Z",4],["2000-01-01T00:00:20Z",4],["2000-01-01T00:00:10Z",4],["2000-01-01T00:00:00Z",2]]}]}]}`,
},
}...)

for i, query := range test.queries {
Expand Down Expand Up @@ -3877,3 +3885,51 @@ func TestServer_Query_EvilIdentifiers(t *testing.T) {
}
}
}

func TestServer_Query_OrderByTime(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "order on points",
params: url.Values{"db": []string{"db0"}},
command: `select value from "cpu" ORDER BY time DESC`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
2 changes: 1 addition & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type SortField struct {
// String returns a string representation of a sort field
func (field *SortField) String() string {
var buf bytes.Buffer
if field.Name == "" {
if field.Name != "" {
_, _ = buf.WriteString(field.Name)
_, _ = buf.WriteString(" ")
}
Expand Down
42 changes: 23 additions & 19 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,24 +1809,29 @@ func (p *Parser) parseOrderBy() (SortFields, error) {
func (p *Parser) parseSortFields() (SortFields, error) {
var fields SortFields

// If first token is ASC or DESC, all fields are sorted.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == ASC || tok == DESC {
if tok == DESC {
// Token must be ASC, until other sort orders are supported.
return nil, errors.New("only ORDER BY time ASC supported at this time")
}
return append(fields, &SortField{Ascending: (tok == ASC)}), nil
} else if tok != IDENT {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "ASC", "DESC"}, pos)
}
p.unscan()
tok, pos, lit := p.scanIgnoreWhitespace()

// At least one field is required.
field, err := p.parseSortField()
if err != nil {
return nil, err
switch tok {
// The first field after an order by may not have a field name (e.g. ORDER BY ASC)
case ASC, DESC:
fields = append(fields, &SortField{Ascending: (tok == ASC)})
// If it's a token, parse it as a sort field. At least one is required.
case IDENT:
p.unscan()
field, err := p.parseSortField()
if err != nil {
return nil, err
}

if lit != "time" {
return nil, errors.New("only ORDER BY time supported at this time")
}

fields = append(fields, field)
// Parse error...
default:
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "ASC", "DESC"}, pos)
}
fields = append(fields, field)

// Parse additional fields.
for {
Expand All @@ -1845,9 +1850,8 @@ func (p *Parser) parseSortFields() (SortFields, error) {
fields = append(fields, field)
}

// First SortField must be time ASC, until other sort orders are supported.
if len(fields) > 1 || fields[0].Name != "time" || !fields[0].Ascending {
return nil, errors.New("only ORDER BY time ASC supported at this time")
if len(fields) > 1 {
return nil, errors.New("only ORDER BY time supported at this time")
}

return fields, nil
Expand Down
23 changes: 10 additions & 13 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func TestParser_ParseStatement(t *testing.T) {

// SELECT statement
{
skip: true,
s: fmt.Sprintf(`SELECT mean(field1), sum(field2) ,count(field3) AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' and time > '%s' GROUP BY time(10h) ORDER BY ASC LIMIT 20 OFFSET 10;`, now.UTC().Format(time.RFC3339Nano)),
s: fmt.Sprintf(`SELECT mean(field1), sum(field2) ,count(field3) AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' and time > '%s' GROUP BY time(10h) ORDER BY DESC LIMIT 20 OFFSET 10;`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
Expand All @@ -101,7 +100,7 @@ func TestParser_ParseStatement(t *testing.T) {
},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 10 * time.Hour}}}}},
SortFields: []*influxql.SortField{
{Ascending: true},
{Ascending: false},
},
Limit: 20,
Offset: 10,
Expand Down Expand Up @@ -587,17 +586,17 @@ func TestParser_ParseStatement(t *testing.T) {
// SHOW SERIES WHERE with ORDER BY and LIMIT
{
skip: true,
s: `SHOW SERIES WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
s: `SHOW SERIES WHERE region = 'order by desc' ORDER BY DESC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ShowSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
RHS: &influxql.StringLiteral{Val: "order by desc"},
},
SortFields: []*influxql.SortField{
{Ascending: true},
{Name: "field1"},
{Name: "field2"},
&influxql.SortField{Ascending: false},
&influxql.SortField{Name: "field1", Ascending: true},
&influxql.SortField{Name: "field2"},
},
Limit: 10,
},
Expand Down Expand Up @@ -1260,10 +1259,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM myseries ORDER BY /`, err: `found /, expected identifier, ASC, DESC at line 1, char 38`},
{s: `SELECT field1 FROM myseries ORDER BY 1`, err: `found 1, expected identifier, ASC, DESC at line 1, char 38`},
{s: `SELECT field1 FROM myseries ORDER BY time ASC,`, err: `found EOF, expected identifier at line 1, char 47`},
{s: `SELECT field1 FROM myseries ORDER BY DESC`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY field1`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time DESC`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time, field1`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time, field1`, err: `only ORDER BY time supported at this time`},
{s: `SELECT field1 AS`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `SELECT field1 FROM foo group by time(1s)`, err: `GROUP BY requires at least one aggregate function`},
{s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`},
Expand Down Expand Up @@ -1440,7 +1436,8 @@ func TestParser_ParseStatement(t *testing.T) {
if !reflect.DeepEqual(tt.err, errstring(err)) {
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
t.Logf("\nexp=%s\ngot=%s\n", mustMarshalJSON(tt.stmt), mustMarshalJSON(stmt))
t.Logf("\n# %s\nexp=%s\ngot=%s\n", tt.s, mustMarshalJSON(tt.stmt), mustMarshalJSON(stmt))
t.Logf("\nSQL exp=%s\nSQL got=%s\n", tt.stmt.String(), stmt.String())
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
}
}
Expand Down
47 changes: 41 additions & 6 deletions tsdb/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,48 @@ import (
"container/heap"
)

// Direction represents a cursor navigation direction.
type Direction bool

const (
// Forward indicates that a cursor will move forward over its values.
Forward Direction = true
// Reverse indicates that a cursor will move backwards over its values.
Reverse Direction = false
)

func (d Direction) String() string {
if d.Forward() {
return "forward"
}
return "reverse"
}

// Forward returns true if direction is forward
func (d Direction) Forward() bool {
return d == Forward
}

// Forward returns true if direction is reverse
func (d Direction) Reverse() bool {
return d == Reverse
}

// MultiCursor returns a single cursor that combines the results of all cursors in order.
//
// If the same key is returned from multiple cursors then the first cursor
// specified will take precendence. A key will only be returned once from the
// returned cursor.
func MultiCursor(cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors}
func MultiCursor(d Direction, cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors, direction: d}
}

// multiCursor represents a cursor that combines multiple cursors into one.
type multiCursor struct {
cursors []Cursor
heap cursorHeap
prev []byte
cursors []Cursor
heap cursorHeap
prev []byte
direction Direction
}

// Seek moves the cursor to a given key.
Expand Down Expand Up @@ -48,6 +76,8 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
return mc.pop()
}

func (mc *multiCursor) Direction() Direction { return mc.direction }

// Next returns the next key/value from the cursor.
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }

Expand Down Expand Up @@ -90,7 +120,12 @@ type cursorHeap []*cursorHeapItem
func (h cursorHeap) Len() int { return len(h) }
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h cursorHeap) Less(i, j int) bool {
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == -1 {
dir := -1
if !h[i].cursor.Direction() {
dir = 1
}

if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir {
return true
} else if cmp == 0 {
return h[i].priority > h[j].priority
Expand Down
Loading