Skip to content

Commit

Permalink
fix #101. Support expressions in aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Dec 5, 2013
1 parent df5de76 commit 045f302
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
- [Issue #64](https://github.com/influxdb/influxdb/issues/64). Shard writes and queries across cluster with replay for briefly downed nodes (< 24 hrs)
- [Issue #78](https://github.com/influxdb/influxdb/issues/78). Sequence numbers persist across restarts so they're not reused
- [Issue #102](https://github.com/influxdb/influxdb/issues/102). Support expressions in where condition
- [Issue #101](https://github.com/influxdb/influxdb/issues/101). Support expressions in aggregates

## Bugfixes

Expand Down
106 changes: 70 additions & 36 deletions src/engine/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"common"
"datastore"
"fmt"
"math"
"parser"
Expand Down Expand Up @@ -43,19 +44,13 @@ func init() {

type AbstractAggregator struct {
Aggregator
fieldName string
fieldIndex int
value *parser.Value
columns []string
}

func (self *AbstractAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if field == self.fieldName {
self.fieldIndex = idx
return nil
}
}

return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
self.columns = series.Fields
return nil
}

//
Expand Down Expand Up @@ -108,10 +103,15 @@ type StandardDeviationAggregator struct {
}

func (self *StandardDeviationAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
fieldValue, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
if ptr := fieldValue.Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
} else if ptr := fieldValue.DoubleValue; ptr != nil {
value = *ptr
} else {
// else ignore this point
Expand Down Expand Up @@ -164,7 +164,7 @@ func NewStandardDeviationAggregator(q *parser.Query, v *parser.Value) (Aggregato

return &StandardDeviationAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: v.Elems[0].Name,
value: v.Elems[0],
},
running: make(map[string]map[interface{}]*StandardDeviationRunning),
}, nil
Expand All @@ -181,10 +181,15 @@ type DerivativeAggregator struct {
}

func (self *DerivativeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
fieldValue, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
if ptr := fieldValue.Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
} else if ptr := fieldValue.DoubleValue; ptr != nil {
value = *ptr
} else {
// else ignore this point
Expand Down Expand Up @@ -228,7 +233,7 @@ func (self *DerivativeAggregator) GetValues(series string, group interface{}) []
if newValue != nil {
// if an old value exist, then compute the derivative and insert it in the points slice
deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond)
deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue
deltaV := *newValue.Values[0].DoubleValue - *oldValue.Values[0].DoubleValue
derivative := deltaV / deltaT
return [][]*protocol.FieldValue{
[]*protocol.FieldValue{
Expand All @@ -250,7 +255,7 @@ func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, erro

return &DerivativeAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: v.Elems[0].Name,
value: v.Elems[0],
},
firstValues: make(map[string]map[interface{}]*protocol.Point),
lastValues: make(map[string]map[interface{}]*protocol.Point),
Expand Down Expand Up @@ -280,10 +285,15 @@ func (self *HistogramAggregator) AggregatePoint(series string, group interface{}
groups[group] = buckets
}

fieldValue, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
if ptr := fieldValue.Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
} else if ptr := fieldValue.DoubleValue; ptr != nil {
value = *ptr
}

Expand Down Expand Up @@ -341,11 +351,9 @@ func NewHistogramAggregator(q *parser.Query, v *parser.Value) (Aggregator, error
}
}

fieldName := v.Elems[0].Name

return &HistogramAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: fieldName,
value: v.Elems[0],
},
bucketSize: bucketSize,
histograms: make(map[string]map[interface{}]map[int]int),
Expand Down Expand Up @@ -494,10 +502,15 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p *
currentMean := means[group]
currentCount := counts[group] + 1

fieldValue, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
if ptr := fieldValue.Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
} else if ptr := fieldValue.DoubleValue; ptr != nil {
value = *ptr
}

Expand Down Expand Up @@ -529,7 +542,7 @@ func NewMeanAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)

return &MeanAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
means: make(map[string]map[interface{}]float64),
counts: make(map[string]map[interface{}]int),
Expand All @@ -543,7 +556,7 @@ func NewMedianAggregator(_ *parser.Query, value *parser.Value) (Aggregator, erro

return &PercentileAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
functionName: "median",
percentile: 50.0,
Expand All @@ -563,8 +576,12 @@ type PercentileAggregator struct {
}

func (self *PercentileAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
v, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

value := 0.0
v := p.Values[self.fieldIndex]
if v.Int64Value != nil {
value = float64(*v.Int64Value)
} else if v.DoubleValue != nil {
Expand Down Expand Up @@ -614,7 +631,7 @@ func NewPercentileAggregator(_ *parser.Query, value *parser.Value) (Aggregator,

return &PercentileAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
functionName: "percentile",
percentile: percentile,
Expand Down Expand Up @@ -643,8 +660,12 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p *
groupCounts = make(map[float64]int)
}

point, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value float64
point := p.Values[self.fieldIndex]
if point.Int64Value != nil {
value = float64(*point.Int64Value)
} else if point.DoubleValue != nil {
Expand Down Expand Up @@ -700,7 +721,7 @@ func NewModeAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error)

return &ModeAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
counts: make(map[string]map[interface{}]map[float64]int),
}, nil
Expand All @@ -727,8 +748,12 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{},
groupCounts = make(map[interface{}]int)
}

point, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

var value interface{}
point := p.Values[self.fieldIndex]
if point.Int64Value != nil {
value = float64(*point.Int64Value)
} else if point.DoubleValue != nil {
Expand Down Expand Up @@ -776,7 +801,7 @@ func (self *DistinctAggregator) GetValues(series string, group interface{}) [][]
func NewDistinctAggregator(_ *parser.Query, value *parser.Value) (Aggregator, error) {
return &DistinctAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
counts: make(map[string]map[interface{}]map[interface{}]int),
}, nil
Expand Down Expand Up @@ -806,7 +831,11 @@ func (self *CumulativeArithmeticAggregator) AggregatePoint(series string, group
if !ok {
currentValue = self.initialValue
}
values[group] = self.operation(currentValue, p.Values[self.fieldIndex])
value, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}
values[group] = self.operation(currentValue, value)
return nil
}

Expand All @@ -828,7 +857,7 @@ func NewCumulativeArithmeticAggregator(name string, value *parser.Value, initial

return &CumulativeArithmeticAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: value.Elems[0].Name,
value: value.Elems[0],
},
name: name,
values: make(map[string]map[interface{}]float64),
Expand Down Expand Up @@ -893,7 +922,12 @@ func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface
self.values[series] = values
}
if values[group] == nil || !self.isFirst {
values[group] = p.Values[self.fieldIndex]
value, err := datastore.GetValue(self.value, self.columns, p)
if err != nil {
return err
}

values[group] = value
}
return nil
}
Expand All @@ -917,7 +951,7 @@ func NewFirstOrLastAggregator(name string, v *parser.Value, isFirst bool) (Aggre

return &FirstOrLastAggregator{
AbstractAggregator: AbstractAggregator{
fieldName: v.Elems[0].Name,
value: v.Elems[0],
},
name: name,
isFirst: isFirst,
Expand Down
27 changes: 25 additions & 2 deletions src/integration/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_issue_105",
"name": "test_where_expression",
"columns": ["time", "a", "b"],
"points":[
[1386262529794, 2, 1],
Expand All @@ -650,7 +650,7 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) {
}
]`, "time_precision=m")
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("select a, b from test_issue_105 where a + b >= 3")
bs, err := self.server.RunQuery("select a, b from test_where_expression where a + b >= 3")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
Expand All @@ -660,6 +660,29 @@ func (self *IntegrationSuite) TestWhereConditionWithExpression(c *C) {
c.Assert(data[0].Points[0][3], Equals, 1.0)
}

func (self *IntegrationSuite) TestAggregateWithExpression(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_aggregate_expression",
"columns": ["time", "a", "b"],
"points":[
[1386262529794, 1, 1],
[1386262529794, 2, 2]
]
}
]`, "time_precision=m")
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("select mean(a + b) from test_aggregate_expression")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
c.Assert(data[0].Columns, HasLen, 2)
c.Assert(data[0].Points, HasLen, 1)
c.Assert(data[0].Points[0][1], Equals, 3.0)
}

// test for issue #41
func (self *IntegrationSuite) TestDbDelete(c *C) {
err := self.server.WriteData(`
Expand Down

0 comments on commit 045f302

Please sign in to comment.