diff --git a/CHANGELOG.md b/CHANGELOG.md index 21a0e07869f..81161514e4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -150,6 +150,7 @@ - [Issue #64](https://github.com/influxdb/influxdb/issues/64). Shard writes and queries across cluster with replay for briefly downed nodes (< 24 hrs) - [Issue #78](https://github.com/influxdb/influxdb/issues/78). Sequence numbers persist across restarts so they're not reused - [Issue #102](https://github.com/influxdb/influxdb/issues/102). Support expressions in where condition +- [Issue #101](https://github.com/influxdb/influxdb/issues/101). Support expressions in aggregates ## Bugfixes diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index 387b2f23780..11cc2f66207 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -2,6 +2,7 @@ package engine import ( "common" + "datastore" "fmt" "math" "parser" @@ -43,19 +44,13 @@ func init() { type AbstractAggregator struct { Aggregator - fieldName string - fieldIndex int + value *parser.Value + columns []string } func (self *AbstractAggregator) InitializeFieldsMetadata(series *protocol.Series) error { - for idx, field := range series.Fields { - if field == self.fieldName { - self.fieldIndex = idx - return nil - } - } - - return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName)) + self.columns = series.Fields + return nil } // @@ -108,10 +103,15 @@ type StandardDeviationAggregator struct { } func (self *StandardDeviationAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + fieldValue, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value float64 - if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + if ptr := fieldValue.Int64Value; ptr != nil { value = float64(*ptr) - } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + } else if ptr := fieldValue.DoubleValue; ptr != nil { value = *ptr } else { // else ignore this point @@ -164,7 +164,7 @@ func NewStandardDeviationAggregator(q *parser.Query, v *parser.Value) (Aggregato return &StandardDeviationAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: v.Elems[0].Name, + value: v.Elems[0], }, running: make(map[string]map[interface{}]*StandardDeviationRunning), }, nil @@ -181,10 +181,15 @@ type DerivativeAggregator struct { } func (self *DerivativeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + fieldValue, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value float64 - if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + if ptr := fieldValue.Int64Value; ptr != nil { value = float64(*ptr) - } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + } else if ptr := fieldValue.DoubleValue; ptr != nil { value = *ptr } else { // else ignore this point @@ -228,7 +233,7 @@ func (self *DerivativeAggregator) GetValues(series string, group interface{}) [] if newValue != nil { // if an old value exist, then compute the derivative and insert it in the points slice deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond) - deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue + deltaV := *newValue.Values[0].DoubleValue - *oldValue.Values[0].DoubleValue derivative := deltaV / deltaT return [][]*protocol.FieldValue{ []*protocol.FieldValue{ @@ -250,7 +255,7 @@ func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, erro return &DerivativeAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: v.Elems[0].Name, + value: v.Elems[0], }, firstValues: make(map[string]map[interface{}]*protocol.Point), lastValues: make(map[string]map[interface{}]*protocol.Point), @@ -280,10 +285,15 @@ func (self *HistogramAggregator) AggregatePoint(series string, group interface{} groups[group] = buckets } + fieldValue, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value float64 - if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + if ptr := fieldValue.Int64Value; ptr != nil { value = float64(*ptr) - } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + } else if ptr := fieldValue.DoubleValue; ptr != nil { value = *ptr } @@ -341,11 +351,9 @@ func NewHistogramAggregator(q *parser.Query, v *parser.Value) (Aggregator, error } } - fieldName := v.Elems[0].Name - return &HistogramAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: fieldName, + value: v.Elems[0], }, bucketSize: bucketSize, histograms: make(map[string]map[interface{}]map[int]int), @@ -494,10 +502,15 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p * currentMean := means[group] currentCount := counts[group] + 1 + fieldValue, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value float64 - if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil { + if ptr := fieldValue.Int64Value; ptr != nil { value = float64(*ptr) - } else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil { + } else if ptr := fieldValue.DoubleValue; ptr != nil { value = *ptr } @@ -529,7 +542,7 @@ func NewMeanAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) return &MeanAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, means: make(map[string]map[interface{}]float64), counts: make(map[string]map[interface{}]int), @@ -543,7 +556,7 @@ func NewMedianAggregator(_ *parser.Query, value *parser.Value) (Aggregator, erro return &PercentileAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, functionName: "median", percentile: 50.0, @@ -563,8 +576,12 @@ type PercentileAggregator struct { } func (self *PercentileAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { + v, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + value := 0.0 - v := p.Values[self.fieldIndex] if v.Int64Value != nil { value = float64(*v.Int64Value) } else if v.DoubleValue != nil { @@ -614,7 +631,7 @@ func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator, return &PercentileAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, functionName: "percentile", percentile: percentile, @@ -643,8 +660,12 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p * groupCounts = make(map[float64]int) } + point, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value float64 - point := p.Values[self.fieldIndex] if point.Int64Value != nil { value = float64(*point.Int64Value) } else if point.DoubleValue != nil { @@ -700,7 +721,7 @@ func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) return &ModeAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, counts: make(map[string]map[interface{}]map[float64]int), }, nil @@ -727,8 +748,12 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, groupCounts = make(map[interface{}]int) } + point, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + var value interface{} - point := p.Values[self.fieldIndex] if point.Int64Value != nil { value = float64(*point.Int64Value) } else if point.DoubleValue != nil { @@ -776,7 +801,7 @@ func (self *DistinctAggregator) GetValues(series string, group interface{}) [][] func NewDistinctAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) { return &DistinctAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, counts: make(map[string]map[interface{}]map[interface{}]int), }, nil @@ -806,7 +831,11 @@ func (self *CumulativeArithmeticAggregator) AggregatePoint(series string, group if !ok { currentValue = self.initialValue } - values[group] = self.operation(currentValue, p.Values[self.fieldIndex]) + value, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + values[group] = self.operation(currentValue, value) return nil } @@ -828,7 +857,7 @@ func NewCumulativeArithmeticAggregator(name string, value *parser.Value, initial return &CumulativeArithmeticAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: value.Elems[0].Name, + value: value.Elems[0], }, name: name, values: make(map[string]map[interface{}]float64), @@ -893,7 +922,12 @@ func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface self.values[series] = values } if values[group] == nil || !self.isFirst { - values[group] = p.Values[self.fieldIndex] + value, err := datastore.GetValue(self.value, self.columns, p) + if err != nil { + return err + } + + values[group] = value } return nil } @@ -917,7 +951,7 @@ func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool) (Aggre return &FirstOrLastAggregator{ AbstractAggregator: AbstractAggregator{ - fieldName: v.Elems[0].Name, + value: v.Elems[0], }, name: name, isFirst: isFirst, diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index 2d837877814..48fb1c11ba3 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -641,7 +641,7 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) { err := self.server.WriteData(` [ { - "name": "test_issue_105", + "name": "test_where_expression", "columns": ["time", "a", "b"], "points":[ [1386262529794, 2, 1], @@ -650,7 +650,7 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) { } ]`, "time_precision=m") c.Assert(err, IsNil) - bs, err := self.server.RunQuery("select a, b from test_issue_105 where a + b >= 3") + bs, err := self.server.RunQuery("select a, b from test_where_expression where a + b >= 3") c.Assert(err, IsNil) data := []*h.SerializedSeries{} err = json.Unmarshal(bs, &data) @@ -660,6 +660,29 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) { c.Assert(data[0].Points[0][3], Equals, 1.0) } +func (self *IntegrationSuite) TestAggregateWithExpression(c *C) { + err := self.server.WriteData(` +[ + { + "name": "test_aggregate_expression", + "columns": ["time", "a", "b"], + "points":[ + [1386262529794, 1, 1], + [1386262529794, 2, 2] + ] + } +]`, "time_precision=m") + c.Assert(err, IsNil) + bs, err := self.server.RunQuery("select mean(a + b) from test_aggregate_expression") + c.Assert(err, IsNil) + data := []*h.SerializedSeries{} + err = json.Unmarshal(bs, &data) + c.Assert(data, HasLen, 1) + c.Assert(data[0].Columns, HasLen, 2) + c.Assert(data[0].Points, HasLen, 1) + c.Assert(data[0].Points[0][1], Equals, 3.0) +} + // test for issue #41 func (self *IntegrationSuite) TestDbDelete(c *C) { err := self.server.WriteData(`