Skip to content

Commit

Permalink
Merge pull request #1925 from influxdb/fill-options
Browse files Browse the repository at this point in the history
Add fill to select statements.
  • Loading branch information
pauldix committed Mar 12, 2015
2 parents c940c0e + 4b943e9 commit b4e1795
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features
- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duration.
- [#1906](https://github.com/influxdb/influxdb/pull/1906): Add show servers to query language.
- [#1925](https://github.com/influxdb/influxdb/pull/1925): Add `fill(none)`, `fill(previous)`, and `fill(<num>)` to queries.

## v0.9.0-rc10 [2015-03-09]

Expand Down
28 changes: 28 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,34 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`,
},

// Fill tests
{
name: "fill with value",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [
{"name": "fills", "timestamp": "2009-11-10T23:00:02Z","fields": {"val": 3}},
{"name": "fills", "timestamp": "2009-11-10T23:00:03Z","fields": {"val": 5}},
{"name": "fills", "timestamp": "2009-11-10T23:00:06Z","fields": {"val": 4}},
{"name": "fills", "timestamp": "2009-11-10T23:00:16Z","fields": {"val": 10}}
]}`,
query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(1)`,
expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",1],["2009-11-10T23:00:15Z",10]]}]}]}`,
},
{
name: "fill with previous",
query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(previous)`,
expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",4],["2009-11-10T23:00:15Z",10]]}]}]}`,
},
{
name: "fill with none, i.e. clear out nulls",
query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(none)`,
expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:15Z",10]]}]}]}`,
},
{
name: "fill defaults to null",
query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s)`,
expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",null],["2009-11-10T23:00:15Z",10]]}]}]}`,
},

// Metadata display tests

{
Expand Down
4 changes: 2 additions & 2 deletions influxql/INFLUXQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,15 @@ select_stmt = fields from_clause [ into_clause ] [ where_clause ]

```sql
-- select mean value from the cpu measurement where region = 'uswest' grouped by 10 minute intervals
SELECT mean(value) FROM cpu WHERE region = 'uswest' GROUP BY time(10m);
SELECT mean(value) FROM cpu WHERE region = 'uswest' GROUP BY time(10m) fill(0);
```

## Clauses

```
from_clause = "FROM" measurements .
group_by_clause = "GROUP BY" dimensions .
group_by_clause = "GROUP BY" dimensions fill(<option>).
limit_clause = "LIMIT" int_lit .
Expand Down
29 changes: 29 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,19 @@ func (s *AlterRetentionPolicyStatement) RequiredPrivileges() ExecutionPrivileges
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

type FillOption int

const (
// NullFill means that empty aggregate windows will just have null values.
NullFill FillOption = iota
// NoFill means that empty aggregate windows will be purged from the result.
NoFill
// NumberFill means that empty aggregate windows will be filled with the given number
NumberFill
// PreviousFill means that empty aggregate windows will be filled with whatever the previous aggregate window had
PreviousFill
)

// SelectStatement represents a command for extracting data from the database.
type SelectStatement struct {
// Expressions returned from the selection.
Expand Down Expand Up @@ -566,6 +579,12 @@ type SelectStatement struct {

// if it's a query for raw data values (i.e. not an aggregate)
RawQuery bool

// What fill option the select statement uses, if any
Fill FillOption

// The value to fill empty aggregate buckets with, if any
FillValue interface{}
}

// Clone returns a deep copy of the statement.
Expand All @@ -580,6 +599,8 @@ func (s *SelectStatement) Clone() *SelectStatement {
Offset: s.Offset,
SLimit: s.SLimit,
SOffset: s.SOffset,
Fill: s.Fill,
FillValue: s.FillValue,
}
if s.Target != nil {
other.Target = &Target{Measurement: s.Target.Measurement, Database: s.Target.Database}
Expand Down Expand Up @@ -675,6 +696,14 @@ func (s *SelectStatement) String() string {
_, _ = buf.WriteString(" GROUP BY ")
_, _ = buf.WriteString(s.Dimensions.String())
}
switch s.Fill {
case NoFill:
_, _ = buf.WriteString(" fill(none)")
case NumberFill:
_, _ = buf.WriteString(fmt.Sprintf(" fill(%v)", s.FillValue))
case PreviousFill:
_, _ = buf.WriteString(" fill(previous)")
}
if len(s.SortFields) > 0 {
_, _ = buf.WriteString(" ORDER BY ")
_, _ = buf.WriteString(s.SortFields.String())
Expand Down
6 changes: 6 additions & 0 deletions influxql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ func TestSelectStatement_RewriteWildcards(t *testing.T) {
rewrite: `SELECT value FROM cpu GROUP BY host, region, time(1m)`,
},

// GROUP BY wildarde with fill
{
stmt: `SELECT value FROM cpu GROUP BY *,time(1m) fill(0)`,
rewrite: `SELECT value FROM cpu GROUP BY host, region, time(1m) fill(0)`,
},

// GROUP BY wildcard with explicit
{
stmt: `SELECT value FROM cpu GROUP BY *,host`,
Expand Down
49 changes: 49 additions & 0 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
// processes the result values if there's any math in there
resultValues = m.processResults(resultValues)

// handle any fill options
resultValues = m.processFill(resultValues)

row := &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
Expand Down Expand Up @@ -226,6 +229,52 @@ func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} {
return mathResults
}

// processFill will take the results and return new reaults (or the same if no fill modifications are needed) with whatever fill options the query has.
func (m *MapReduceJob) processFill(results [][]interface{}) [][]interface{} {
// don't do anything if it's raw query results or we're supposed to leave the nulls
if m.stmt.RawQuery || m.stmt.Fill == NullFill {
return results
}

if m.stmt.Fill == NoFill {
// remove any rows that have even one nil value. This one is tricky because they could have multiple
// aggregates, but this option means that any row that has even one nil gets purged.
newResults := make([][]interface{}, 0, len(results))
for _, vals := range results {
hasNil := false
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
if vals[j] == nil {
hasNil = true
break
}
}
if !hasNil {
newResults = append(newResults, vals)
}
}
return newResults
}

// they're either filling with previous values or a specific number
for i, vals := range results {
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
if vals[j] == nil {
switch m.stmt.Fill {
case PreviousFill:
if i != 0 {
vals[j] = results[i-1][j]
}
case NumberFill:
vals[j] = m.stmt.FillValue
}
}
}
}
return results
}

func getProcessor(expr Expr, startIndex int) (processor, int) {
switch expr := expr.(type) {
case *VarRef:
Expand Down
41 changes: 41 additions & 0 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,11 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e
return nil, err
}

// Parse fill options: "fill(<option>)"
if stmt.Fill, stmt.FillValue, err = p.parseFill(); err != nil {
return nil, err
}

// Parse sort: "ORDER BY FIELD+".
if stmt.SortFields, err = p.parseOrderBy(); err != nil {
return nil, err
Expand Down Expand Up @@ -1370,6 +1375,42 @@ func (p *Parser) parseDimension() (*Dimension, error) {
return &Dimension{Expr: expr}, nil
}

// parseFill parses the fill call and its optios.
func (p *Parser) parseFill() (FillOption, interface{}, error) {
// Parse the expression first.
expr, err := p.ParseExpr()
if err != nil {
p.unscan()
return NullFill, nil, nil
}
if lit, ok := expr.(*Call); !ok {
p.unscan()
return NullFill, nil, nil
} else {
if lit.Name != "fill" {
p.unscan()
return NullFill, nil, nil
}
if len(lit.Args) != 1 {
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous")
}
switch lit.Args[0].String() {
case "null":
return NullFill, nil, nil
case "none":
return NoFill, nil, nil
case "previous":
return PreviousFill, nil, nil
default:
num, ok := lit.Args[0].(*NumberLiteral)
if !ok {
return NullFill, nil, fmt.Errorf("expected number argument in fill()")
}
return NumberFill, num.Val, nil
}
}
}

// parseOptionalTokenAndInt parses the specified token followed
// by an int, if it exists.
func (p *Parser) parseOptionalTokenAndInt(t Token) (int, error) {
Expand Down
43 changes: 43 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,49 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SELECT statement with fill
{
s: `SELECT mean(value) FROM cpu GROUP BY time(5m) fill(1)`,
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Source: &influxql.Measurement{Name: "cpu"},
Dimensions: []*influxql.Dimension{
{Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
}},
},
Fill: influxql.NumberFill,
FillValue: float64(1),
},
},

// SELECT statement with previous fill
{
s: `SELECT mean(value) FROM cpu GROUP BY time(5m) fill(previous)`,
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Source: &influxql.Measurement{Name: "cpu"},
Dimensions: []*influxql.Dimension{
{Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: 5 * time.Minute},
},
}},
},
Fill: influxql.PreviousFill,
},
},

// DELETE statement
{
s: `DELETE FROM myseries WHERE host = 'hosta.influxdb.org'`,
Expand Down

0 comments on commit b4e1795

Please sign in to comment.